From 4d58ff2df7d052041bf259dcd69016719c3c5ef8 Mon Sep 17 00:00:00 2001 From: godchen Date: Mon, 15 Nov 2021 15:17:09 +0800 Subject: [PATCH] Fix reload delta channel error (#11783) Signed-off-by: godchen --- internal/querycoord/meta.go | 5 +++-- internal/querycoord/meta_test.go | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index d2dbdf0b25..442a9798ae 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -17,6 +17,7 @@ import ( "fmt" "path/filepath" "strconv" + "strings" "sync" "github.com/golang/protobuf/proto" @@ -202,8 +203,8 @@ func (m *MetaReplica) reloadFromKV() error { return nil } for index, value := range deltaChannelValues { - collectionIDString, _ := filepath.Split(deltaChannelKeys[index]) - collectionID, err := strconv.ParseInt(collectionIDString, 10, 64) + pathStrings := strings.Split(deltaChannelKeys[index], "/") + collectionID, err := strconv.ParseInt(pathStrings[len(pathStrings)-2], 10, 64) if err != nil { return err } diff --git a/internal/querycoord/meta_test.go b/internal/querycoord/meta_test.go index 85ad7cba2a..baa932617d 100644 --- a/internal/querycoord/meta_test.go +++ b/internal/querycoord/meta_test.go @@ -23,6 +23,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" ) @@ -311,6 +312,7 @@ func TestReloadMetaFromKV(t *testing.T) { collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, segmentInfos: map[UniqueID]*querypb.SegmentInfo{}, queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{}, + deltaChannelInfos: map[UniqueID][]*datapb.VchannelInfo{}, } kvs := make(map[string]string) @@ -338,6 +340,18 @@ func TestReloadMetaFromKV(t *testing.T) { queryChannelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, defaultCollectionID) kvs[queryChannelKey] = string(queryChannelBlobs) + deltaChannel1 := &datapb.VchannelInfo{CollectionID: defaultCollectionID, ChannelName: "delta-channel1"} + deltaChannel2 := &datapb.VchannelInfo{CollectionID: defaultCollectionID, ChannelName: "delta-channel2"} + + infos := []*datapb.VchannelInfo{deltaChannel1, deltaChannel2} + for _, info := range infos { + infoBytes, err := proto.Marshal(info) + assert.Nil(t, err) + + key := fmt.Sprintf("%s/%d/%s", deltaChannelMetaPrefix, defaultCollectionID, info.ChannelName) + kvs[key] = string(infoBytes) + } + err = kv.MultiSave(kvs) assert.Nil(t, err)