diff --git a/Makefile b/Makefile index 62b11ce282..ccae0badb2 100644 --- a/Makefile +++ b/Makefile @@ -116,6 +116,11 @@ test-proxy: go test -race -coverpkg=./... -coverprofile=profile.out -covermode=atomic -timeout 5m github.com/milvus-io/milvus/internal/proxy -v +test-querycoord: + @echo "Running go unittests..." + go test -race -coverpkg=./... -coverprofile=profile.out -covermode=atomic -timeout 5m github.com/milvus-io/milvus/internal/querycoord -v + + test-go: build-cpp-with-unittest @echo "Running go unittests..." @(env bash $(PWD)/scripts/run_go_codecov.sh) diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 5e94526561..de0d302b01 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -37,6 +37,7 @@ const ( segmentMetaPrefix = "queryCoord-segmentMeta" queryChannelMetaPrefix = "queryCoord-queryChannel" sealedSegmentChangeInfoPrefix = "queryCoord-sealedSegmentChangeInfo" + globalQuerySeekPositionPrefix = "queryCoord-globalQuerySeekPosition" ) type col2SegmentInfos = map[UniqueID][]*querypb.SegmentInfo @@ -100,6 +101,7 @@ type MetaReplica struct { queryStreams map[UniqueID]msgstream.MsgStream streamMu sync.RWMutex + globalSeekPosition *internalpb.MsgPosition //partitionStates map[UniqueID]*querypb.PartitionStates } @@ -109,6 +111,7 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll segmentInfos := make(map[UniqueID]*querypb.SegmentInfo) queryChannelInfos := make(map[UniqueID]*querypb.QueryChannelInfo) queryMsgStream := make(map[UniqueID]msgstream.MsgStream) + position := &internalpb.MsgPosition{} m := &MetaReplica{ ctx: childCtx, @@ -117,10 +120,11 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll msFactory: factory, idAllocator: idAllocator, - collectionInfos: collectionInfos, - segmentInfos: segmentInfos, - queryChannelInfos: queryChannelInfos, - queryStreams: queryMsgStream, + collectionInfos: collectionInfos, + segmentInfos: segmentInfos, + queryChannelInfos: queryChannelInfos, + queryStreams: queryMsgStream, + globalSeekPosition: position, } err := m.reloadFromKV() @@ -182,6 +186,15 @@ func (m *MetaReplica) reloadFromKV() error { } m.queryChannelInfos[collectionID] = queryChannelInfo } + globalSeekPosValue, err := m.client.Load(globalQuerySeekPositionPrefix) + if err == nil { + position := &internalpb.MsgPosition{} + err = proto.Unmarshal([]byte(globalSeekPosValue), position) + if err != nil { + return err + } + m.globalSeekPosition = position + } //TODO::update partition states return nil @@ -399,6 +412,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal } queryChannelInfosMap := make(map[UniqueID]*querypb.QueryChannelInfo) + var globalSeekPositionTmp *internalpb.MsgPosition for collectionID, segmentChangeInfos := range col2SegmentChangeInfos { // get msgStream to produce sealedSegmentChangeInfos to query channel queryChannelInfo, messageIDInfos, err := m.sendSealedSegmentChangeInfos(collectionID, segmentChangeInfos) @@ -438,6 +452,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal } queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos queryChannelInfosMap[collectionID] = queryChannelInfo + globalSeekPositionTmp = queryChannelInfo.SeekPosition } saveKvs := make(map[string]string) @@ -460,6 +475,11 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal channelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID) saveKvs[channelKey] = string(channelInfoBytes) } + seekPos, err := proto.Marshal(globalSeekPositionTmp) + if err != nil { + return col2SegmentChangeInfos, err + } + saveKvs[globalQuerySeekPositionPrefix] = string(seekPos) // save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd // avoid the produce process success but save meta to etcd failed @@ -474,7 +494,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal saveKvs[changeInfoKey] = string(changeInfoBytes) } - err := m.client.MultiSave(saveKvs) + err = m.client.MultiSave(saveKvs) if err != nil { log.Error("updateGlobalSealedSegmentInfos: save info to etcd error", zap.Error(err)) return col2SegmentChangeInfos, err @@ -492,6 +512,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal for collectionID, channelInfo := range queryChannelInfosMap { m.queryChannelInfos[collectionID] = channelInfo } + m.globalSeekPosition = globalSeekPositionTmp m.channelMu.Unlock() return col2SegmentChangeInfos, nil @@ -563,6 +584,11 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio } channelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID) saveKvs[channelKey] = string(channelInfoBytes) + seekPos, err := proto.Marshal(queryChannelInfo.SeekPosition) + if err != nil { + return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err + } + saveKvs[globalQuerySeekPositionPrefix] = string(seekPos) // save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd // avoid the produce process success but save meta to etcd failed @@ -594,6 +620,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio m.channelMu.Lock() m.queryChannelInfos[collectionID] = queryChannelInfo + m.globalSeekPosition = queryChannelInfo.SeekPosition m.channelMu.Unlock() return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil @@ -890,6 +917,10 @@ func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.Q // set info.collectionID from 0 to realID info.CollectionID = collectionID m.queryChannelInfos[collectionID] = info + info.SeekPosition = m.globalSeekPosition + if info.SeekPosition != nil { + info.SeekPosition.ChannelName = info.QueryChannelID + } return proto.Clone(info).(*querypb.QueryChannelInfo), nil } diff --git a/internal/querycoord/meta_test.go b/internal/querycoord/meta_test.go index 271454bbe4..b260db4deb 100644 --- a/internal/querycoord/meta_test.go +++ b/internal/querycoord/meta_test.go @@ -23,6 +23,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" ) @@ -50,6 +51,10 @@ func (tk *testKv) LoadWithPrefix(key string) ([]string, []string, error) { return nil, nil, nil } +func (tk *testKv) Load(key string) (string, error) { + return "", nil +} + func TestReplica_Release(t *testing.T) { refreshParams() etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) @@ -85,10 +90,11 @@ func TestMetaFunc(t *testing.T) { kv, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) assert.Nil(t, err) meta := &MetaReplica{ - client: kv, - collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, - segmentInfos: map[UniqueID]*querypb.SegmentInfo{}, - queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{}, + client: kv, + collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, + segmentInfos: map[UniqueID]*querypb.SegmentInfo{}, + queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{}, + globalSeekPosition: &internalpb.MsgPosition{}, } nodeID := int64(100) @@ -342,4 +348,20 @@ func TestReloadMetaFromKV(t *testing.T) { assert.Equal(t, true, ok) _, ok = meta.queryChannelInfos[defaultCollectionID] assert.Equal(t, true, ok) + + t.Run("test no global query seek position", func(t *testing.T) { + err = kv.Remove(globalQuerySeekPositionPrefix) + assert.NoError(t, err) + + err = meta.reloadFromKV() + assert.NoError(t, err) + }) + + t.Run("test wrong global query seek position", func(t *testing.T) { + err = kv.Save(globalQuerySeekPositionPrefix, "&%*&^*^(&%*&%&^%") + assert.NoError(t, err) + + err = meta.reloadFromKV() + assert.Error(t, err) + }) }