diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 442a9798ae..a5093c28e6 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -31,15 +31,14 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/mqclient" ) const ( collectionMetaPrefix = "queryCoord-collectionMeta" - segmentMetaPrefix = "queryCoord-segmentMeta" queryChannelMetaPrefix = "queryCoord-queryChannel" deltaChannelMetaPrefix = "queryCoord-deltaChannel" - sealedSegmentChangeInfoPrefix = "queryCoord-sealedSegmentChangeInfo" globalQuerySeekPositionPrefix = "queryCoord-globalQuerySeekPosition" ) @@ -164,7 +163,7 @@ func (m *MetaReplica) reloadFromKV() error { m.collectionInfos[collectionID] = collectionInfo } - segmentKeys, segmentValues, err := m.client.LoadWithPrefix(segmentMetaPrefix) + segmentKeys, segmentValues, err := m.client.LoadWithPrefix(util.SegmentMetaPrefix) if err != nil { return err } @@ -413,7 +412,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal col2SegmentChangeInfos := make(col2SealedSegmentChangeInfos) segmentsCompactionFrom := make([]UniqueID, 0) - // get segmentInfos to sav + // get segmentInfos to colSegmentInfos for collectionID, onlineInfos := range saves { segmentsChangeInfo := &querypb.SealedSegmentsChangeInfo{ Base: &commonpb.MsgBase{ @@ -508,7 +507,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal if err != nil { return col2SegmentChangeInfos, err } - segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, info.SegmentID) + segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, info.SegmentID) segmentInfoKvs[segmentKey] = string(segmentInfoBytes) } } @@ -521,7 +520,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal // remove compacted segment info from etcd for _, segmentID := range segmentsCompactionFrom { - segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, segmentID) + segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, segmentID) err := m.client.Remove(segmentKey) if err != nil { panic(err) @@ -553,7 +552,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal return col2SegmentChangeInfos, err } // TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg - changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, changeInfos.Base.MsgID) + changeInfoKey := fmt.Sprintf("%s/%d", util.ChangeInfoMetaPrefix, changeInfos.Base.MsgID) saveKvs[changeInfoKey] = string(changeInfoBytes) } @@ -644,7 +643,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio // remove meta from etcd for _, info := range removes { - segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, info.SegmentID) + segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, info.SegmentID) err = m.client.Remove(segmentKey) if err != nil { panic(err) @@ -673,7 +672,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err } // TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg - changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, segmentChangeInfos.Base.MsgID) + changeInfoKey := fmt.Sprintf("%s/%d", util.ChangeInfoMetaPrefix, segmentChangeInfos.Base.MsgID) saveKvs[changeInfoKey] = string(changeInfoBytes) err = m.client.MultiSave(saveKvs) @@ -695,6 +694,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil } +// send sealed segment change infos into query channels func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) { // get msgStream to produce sealedSegmentChangeInfos to query channel queryChannelInfo, err := m.getQueryChannelInfoByID(collectionID) @@ -1179,7 +1179,7 @@ func multiSaveSegmentInfos(segmentInfos map[UniqueID]*querypb.SegmentInfo, kv kv if err != nil { return err } - key := fmt.Sprintf("%s/%d", segmentMetaPrefix, segmentID) + key := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, segmentID) kvs[key] = string(infoBytes) } @@ -1189,7 +1189,7 @@ func multiSaveSegmentInfos(segmentInfos map[UniqueID]*querypb.SegmentInfo, kv kv func multiRemoveSegmentInfo(segmentIDs []UniqueID, kv kv.MetaKv) error { keys := make([]string, 0) for _, segmentID := range segmentIDs { - key := fmt.Sprintf("%s/%d", segmentMetaPrefix, segmentID) + key := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, segmentID) keys = append(keys, key) } diff --git a/internal/querycoord/meta_test.go b/internal/querycoord/meta_test.go index baa932617d..90a75d3646 100644 --- a/internal/querycoord/meta_test.go +++ b/internal/querycoord/meta_test.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util" ) func successResult() error { return nil } @@ -329,7 +330,7 @@ func TestReloadMetaFromKV(t *testing.T) { } segmentBlobs, err := proto.Marshal(segmentInfo) assert.Nil(t, err) - segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, defaultSegmentID) + segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, defaultSegmentID) kvs[segmentKey] = string(segmentBlobs) queryChannelInfo := &querypb.QueryChannelInfo{ diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 9e3856e4b1..a8951ec48e 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -954,6 +954,7 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta) } if err != nil { + log.Error("Failed to update global sealed seg infos, begin to rollback", zap.Error(err)) rollBackSegmentChangeInfoErr := retry.Do(ctx, func() error { rollBackChangeInfos := reverseSealedSegmentChangeInfo(sealedSegmentChangeInfos) for collectionID, infos := range rollBackChangeInfos { @@ -967,6 +968,7 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta) if rollBackSegmentChangeInfoErr != nil { log.Error("scheduleLoop: Restore the information of global sealed segments in query node failed", zap.Error(rollBackSegmentChangeInfoErr)) } + log.Info("Successfully roll back segment info change") return err } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 45492f70c4..48f1c9faef 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -558,12 +558,6 @@ func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) er partition.removeSegmentID(segmentID) delete(colReplica.segments, segmentID) deleteSegment(segment) - key := fmt.Sprintf("%s/%d", queryNodeSegmentMetaPrefix, segmentID) - err = colReplica.etcdKV.Remove(key) - if err != nil { - log.Warn("error when remove segment info from etcd") - } - return nil } diff --git a/internal/querynode/global_sealed_segment_manager.go b/internal/querynode/global_sealed_segment_manager.go index 44e0a19881..2b1497973b 100644 --- a/internal/querynode/global_sealed_segment_manager.go +++ b/internal/querynode/global_sealed_segment_manager.go @@ -32,18 +32,16 @@ func newGlobalSealedSegmentManager(collectionID UniqueID) *globalSealedSegmentMa } } -func (g *globalSealedSegmentManager) addGlobalSegmentInfo(segmentInfo *querypb.SegmentInfo) error { +func (g *globalSealedSegmentManager) addGlobalSegmentInfo(segmentInfo *querypb.SegmentInfo) { g.mu.Lock() defer g.mu.Unlock() if segmentInfo.CollectionID != g.collectionID { - log.Debug("mismatch collectionID when addGlobalSegmentInfo", + log.Warn("Find mismatch collectionID when addGlobalSegmentInfo", zap.Any("manager collectionID", g.collectionID), zap.Any("segmentInfo collectionID", segmentInfo.CollectionID), ) - return nil } g.globalSealedSegments[segmentInfo.SegmentID] = segmentInfo - return nil } func (g *globalSealedSegmentManager) getGlobalSegmentIDs() []UniqueID { @@ -70,14 +68,14 @@ func (g *globalSealedSegmentManager) getGlobalSegmentIDsByPartitionIds(partition return resIDs } -func (g *globalSealedSegmentManager) hasGlobalSegment(segmentID UniqueID) bool { +func (g *globalSealedSegmentManager) hasGlobalSealedSegment(segmentID UniqueID) bool { g.mu.Lock() defer g.mu.Unlock() _, ok := g.globalSealedSegments[segmentID] return ok } -func (g *globalSealedSegmentManager) removeGlobalSegmentInfo(segmentID UniqueID) { +func (g *globalSealedSegmentManager) removeGlobalSealedSegmentInfo(segmentID UniqueID) { g.mu.Lock() defer g.mu.Unlock() delete(g.globalSealedSegments, segmentID) diff --git a/internal/querynode/global_sealed_segment_manager_test.go b/internal/querynode/global_sealed_segment_manager_test.go index 8644b2d9b1..0bc9c0f6b8 100644 --- a/internal/querynode/global_sealed_segment_manager_test.go +++ b/internal/querynode/global_sealed_segment_manager_test.go @@ -29,13 +29,10 @@ func TestGlobalSealedSegmentManager(t *testing.T) { PartitionID: defaultPartitionID, } - err := manager.addGlobalSegmentInfo(segmentInfo) - assert.NoError(t, err) + manager.addGlobalSegmentInfo(segmentInfo) segmentInfo.CollectionID = 1000 - err = manager.addGlobalSegmentInfo(segmentInfo) - assert.NoError(t, err) - + manager.addGlobalSegmentInfo(segmentInfo) ids := manager.getGlobalSegmentIDs() assert.Len(t, ids, 1) assert.Equal(t, segmentInfo.SegmentID, ids[0]) @@ -49,19 +46,17 @@ func TestGlobalSealedSegmentManager(t *testing.T) { assert.Len(t, ids, 0) segmentInfo.CollectionID = defaultCollectionID - err = manager.addGlobalSegmentInfo(segmentInfo) - assert.NoError(t, err) + manager.addGlobalSegmentInfo(segmentInfo) - manager.removeGlobalSegmentInfo(defaultSegmentID) + manager.removeGlobalSealedSegmentInfo(defaultSegmentID) ids = manager.getGlobalSegmentIDs() assert.Len(t, ids, 0) - has := manager.hasGlobalSegment(defaultSegmentID) + has := manager.hasGlobalSealedSegment(defaultSegmentID) assert.False(t, has) segmentInfo.CollectionID = defaultCollectionID - err = manager.addGlobalSegmentInfo(segmentInfo) - assert.NoError(t, err) + manager.addGlobalSegmentInfo(segmentInfo) manager.close() ids = manager.getGlobalSegmentIDs() diff --git a/internal/querynode/historical.go b/internal/querynode/historical.go index 2a279db2d0..846705158f 100644 --- a/internal/querynode/historical.go +++ b/internal/querynode/historical.go @@ -28,10 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/storage" -) - -const ( - segmentMetaPrefix = "queryCoord-segmentMeta" + "github.com/milvus-io/milvus/internal/util" ) // historical is in charge of historical data in query node @@ -73,7 +70,7 @@ func (h *historical) close() { func (h *historical) watchGlobalSegmentMeta() { log.Debug("query node watchGlobalSegmentMeta start") - watchChan := h.etcdKV.WatchWithPrefix(segmentMetaPrefix) + watchChan := h.etcdKV.WatchWithPrefix(util.SegmentMetaPrefix) for { select { diff --git a/internal/querynode/historical_test.go b/internal/querynode/historical_test.go index cf3110cee4..0bbc35b446 100644 --- a/internal/querynode/historical_test.go +++ b/internal/querynode/historical_test.go @@ -18,9 +18,9 @@ import ( "time" "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util" + "github.com/stretchr/testify/assert" ) func TestHistorical_GlobalSealedSegments(t *testing.T) { @@ -71,7 +71,7 @@ func TestHistorical_GlobalSealedSegments(t *testing.T) { segmentInfoBytes, err := proto.Marshal(segmentInfo) assert.Nil(t, err) assert.NotNil(t, n.etcdKV) - segmentKey := segmentMetaPrefix + "/" + strconv.FormatInt(segmentID, 10) + segmentKey := util.SegmentMetaPrefix + "/" + strconv.FormatInt(segmentID, 10) err = n.etcdKV.Save(segmentKey, string(segmentInfoBytes)) assert.NoError(t, err) diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 49da80cbfb..65e321bb5e 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -165,14 +165,7 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery // init global sealed segments for _, segment := range in.GlobalSealedSegments { - err = sc.globalSegmentManager.addGlobalSegmentInfo(segment) - if err != nil { - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - } - return status, err - } + sc.globalSegmentManager.addGlobalSegmentInfo(segment) } // start queryCollection, message stream need to asConsumer before start diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 31be3035a1..a9db4cc291 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/sessionutil" ) @@ -1289,7 +1290,7 @@ func saveChangeInfo(key string, value string) error { return err } - key = changeInfoMetaPrefix + "/" + key + key = util.ChangeInfoMetaPrefix + "/" + key return kv.Save(key, value) } diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 4baa887c16..ad966f8a79 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -303,11 +303,7 @@ func (q *queryCollection) consumeQuery() { log.Warn(err.Error()) } case *msgstream.SealedSegmentsChangeInfoMsg: - err := q.adjustByChangeInfo(sm) - if err != nil { - // should not happen - log.Error(err.Error()) - } + q.adjustByChangeInfo(sm) default: log.Warn("unsupported msg type in search channel", zap.Any("msg", sm)) } @@ -316,15 +312,25 @@ func (q *queryCollection) consumeQuery() { } } -func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChangeInfoMsg) error { +func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChangeInfoMsg) { for _, info := range msg.Infos { + // precheck collection id, if not the same collection, skip + for _, segment := range info.OnlineSegments { + if segment.CollectionID != q.collectionID { + return + } + } + + for _, segment := range info.OfflineSegments { + if segment.CollectionID != q.collectionID { + return + } + } + // for OnlineSegments: for _, segment := range info.OnlineSegments { // 1. update global sealed segments - err := q.globalSegmentManager.addGlobalSegmentInfo(segment) - if err != nil { - return err - } + q.globalSegmentManager.addGlobalSegmentInfo(segment) // 2. update excluded segment, cluster have been loaded sealed segments, // so we need to avoid getting growing segment from flow graph. q.streaming.replica.addExcludedSegments(segment.CollectionID, []*datapb.SegmentInfo{ @@ -346,10 +352,14 @@ func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChange // for OfflineSegments: for _, segment := range info.OfflineSegments { // 1. update global sealed segments - q.globalSegmentManager.removeGlobalSegmentInfo(segment.SegmentID) + q.globalSegmentManager.removeGlobalSealedSegmentInfo(segment.SegmentID) } + + log.Info("Successfully changed global sealed segment info ", + zap.Int64("collection ", q.collectionID), + zap.Any("online segments ", info.OnlineSegments), + zap.Any("offline segments ", info.OfflineSegments)) } - return nil } func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index a322ffddda..80e752e332 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -659,16 +659,14 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) { // test online segmentChangeInfos.Infos[0].OnlineSegments = append(segmentChangeInfos.Infos[0].OnlineSegments, genSimpleSegmentInfo()) - err = qc.adjustByChangeInfo(segmentChangeInfos) - assert.NoError(t, err) + qc.adjustByChangeInfo(segmentChangeInfos) ids := qc.globalSegmentManager.getGlobalSegmentIDs() assert.Len(t, ids, 1) // test offline segmentChangeInfos.Infos[0].OnlineSegments = make([]*querypb.SegmentInfo, 0) segmentChangeInfos.Infos[0].OfflineSegments = append(segmentChangeInfos.Infos[0].OfflineSegments, genSimpleSegmentInfo()) - err = qc.adjustByChangeInfo(segmentChangeInfos) - assert.NoError(t, err) + qc.adjustByChangeInfo(segmentChangeInfos) ids = qc.globalSegmentManager.getGlobalSegmentIDs() assert.Len(t, ids, 0) }) @@ -683,8 +681,7 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) { simpleInfo := genSimpleSegmentInfo() simpleInfo.CollectionID = 1000 segmentChangeInfos.Infos[0].OnlineSegments = append(segmentChangeInfos.Infos[0].OnlineSegments, simpleInfo) - err = qc.adjustByChangeInfo(segmentChangeInfos) - assert.NoError(t, err) + qc.adjustByChangeInfo(segmentChangeInfos) }) t.Run("test no segment when adjustByChangeInfo", func(t *testing.T) { @@ -697,8 +694,7 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) { segmentChangeInfos := genSimpleSealedSegmentsChangeInfoMsg() segmentChangeInfos.Infos[0].OfflineSegments = append(segmentChangeInfos.Infos[0].OfflineSegments, genSimpleSegmentInfo()) - err = qc.adjustByChangeInfo(segmentChangeInfos) - assert.Nil(t, err) + qc.adjustByChangeInfo(segmentChangeInfos) }) } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index a13675b2d2..b4720024dc 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -46,13 +46,12 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) -const changeInfoMetaPrefix = "queryCoord-sealedSegmentChangeInfo" - // make sure QueryNode implements types.QueryNode var _ types.QueryNode = (*QueryNode)(nil) @@ -327,8 +326,7 @@ func (node *QueryNode) SetIndexCoord(index types.IndexCoord) error { func (node *QueryNode) watchChangeInfo() { log.Debug("query node watchChangeInfo start") - watchChan := node.etcdKV.WatchWithPrefix(changeInfoMetaPrefix) - + watchChan := node.etcdKV.WatchWithPrefix(util.ChangeInfoMetaPrefix) for { select { case <-node.queryNodeLoopCtx.Done(): @@ -353,9 +351,9 @@ func (node *QueryNode) watchChangeInfo() { continue } go func() { - err = node.adjustByChangeInfo(info) + err = node.removeSegments(info) if err != nil { - log.Warn("adjustByChangeInfo failed", zap.Any("error", err.Error())) + log.Warn("cleanup segments failed", zap.Any("error", err.Error())) } }() default: @@ -370,6 +368,7 @@ func (node *QueryNode) waitChangeInfo(segmentChangeInfos *querypb.SealedSegments fn := func() error { for _, info := range segmentChangeInfos.Infos { canDoLoadBalance := true + // make sure all query channel already received segment location changes // Check online segments: for _, segmentInfo := range info.OnlineSegments { if node.queryService.hasQueryCollection(segmentInfo.CollectionID) { @@ -378,7 +377,7 @@ func (node *QueryNode) waitChangeInfo(segmentChangeInfos *querypb.SealedSegments canDoLoadBalance = false break } - if info.OnlineNodeID == Params.QueryNodeID && !qc.globalSegmentManager.hasGlobalSegment(segmentInfo.SegmentID) { + if info.OnlineNodeID == Params.QueryNodeID && !qc.globalSegmentManager.hasGlobalSealedSegment(segmentInfo.SegmentID) { canDoLoadBalance = false break } @@ -392,7 +391,7 @@ func (node *QueryNode) waitChangeInfo(segmentChangeInfos *querypb.SealedSegments canDoLoadBalance = false break } - if info.OfflineNodeID == Params.QueryNodeID && qc.globalSegmentManager.hasGlobalSegment(segmentInfo.SegmentID) { + if info.OfflineNodeID == Params.QueryNodeID && qc.globalSegmentManager.hasGlobalSealedSegment(segmentInfo.SegmentID) { canDoLoadBalance = false break } @@ -407,13 +406,13 @@ func (node *QueryNode) waitChangeInfo(segmentChangeInfos *querypb.SealedSegments return nil } - return retry.Do(context.TODO(), fn, retry.Attempts(10)) + return retry.Do(context.TODO(), fn, retry.Attempts(50)) } -func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegmentsChangeInfo) error { +// remove the segments since it's already compacted or balanced to other querynodes +func (node *QueryNode) removeSegments(segmentChangeInfos *querypb.SealedSegmentsChangeInfo) error { err := node.waitChangeInfo(segmentChangeInfos) if err != nil { - log.Error("waitChangeInfo failed", zap.Any("error", err.Error())) return err } @@ -429,10 +428,9 @@ func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegm if hasGrowingSegment { err := node.streaming.replica.removeSegment(segmentInfo.SegmentID) if err != nil { - return err } - log.Debug("remove growing segment in adjustByChangeInfo", + log.Debug("remove growing segment in removeSegments", zap.Any("collectionID", segmentInfo.CollectionID), zap.Any("segmentID", segmentInfo.SegmentID), zap.Any("infoID", segmentChangeInfos.Base.GetMsgID()), @@ -441,13 +439,17 @@ func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegm } // For offline segments: - for _, segment := range info.OfflineSegments { + for _, segmentInfo := range info.OfflineSegments { // load balance or compaction, remove old sealed segments. if info.OfflineNodeID == Params.QueryNodeID { - err := node.historical.replica.removeSegment(segment.SegmentID) + err := node.historical.replica.removeSegment(segmentInfo.SegmentID) if err != nil { return err } + log.Debug("remove sealed segment", zap.Any("collectionID", segmentInfo.CollectionID), + zap.Any("segmentID", segmentInfo.SegmentID), + zap.Any("infoID", segmentChangeInfos.Base.GetMsgID()), + ) } } } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index ac32ab7d6d..0c48648d06 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -295,11 +295,7 @@ func genSimpleQueryNodeToTestWatchChangeInfo(ctx context.Context) (*QueryNode, e if err != nil { return nil, err } - err = qc.globalSegmentManager.addGlobalSegmentInfo(genSimpleSegmentInfo()) - if err != nil { - return nil, err - } - + qc.globalSegmentManager.addGlobalSegmentInfo(genSimpleSegmentInfo()) return node, nil } @@ -318,15 +314,15 @@ func TestQueryNode_adjustByChangeInfo(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - t.Run("test adjustByChangeInfo", func(t *testing.T) { + t.Run("test cleanup segments", func(t *testing.T) { node, err := genSimpleQueryNodeToTestWatchChangeInfo(ctx) assert.NoError(t, err) - err = node.adjustByChangeInfo(genSimpleChangeInfo()) + err = node.removeSegments(genSimpleChangeInfo()) assert.NoError(t, err) }) - t.Run("test adjustByChangeInfo no segment", func(t *testing.T) { + t.Run("test cleanup segments no segment", func(t *testing.T) { node, err := genSimpleQueryNodeToTestWatchChangeInfo(ctx) assert.NoError(t, err) @@ -339,9 +335,9 @@ func TestQueryNode_adjustByChangeInfo(t *testing.T) { qc, err := node.queryService.getQueryCollection(defaultCollectionID) assert.NoError(t, err) - qc.globalSegmentManager.removeGlobalSegmentInfo(defaultSegmentID) + qc.globalSegmentManager.removeGlobalSealedSegmentInfo(defaultSegmentID) - err = node.adjustByChangeInfo(segmentChangeInfos) + err = node.removeSegments(segmentChangeInfos) assert.Error(t, err) }) } @@ -402,7 +398,7 @@ func TestQueryNode_watchChangeInfo(t *testing.T) { qc, err := node.queryService.getQueryCollection(defaultCollectionID) assert.NoError(t, err) - qc.globalSegmentManager.removeGlobalSegmentInfo(defaultSegmentID) + qc.globalSegmentManager.removeGlobalSealedSegmentInfo(defaultSegmentID) go node.watchChangeInfo() diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 8c8ba3dca0..9383c731c7 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -35,10 +35,6 @@ import ( "github.com/milvus-io/milvus/internal/util/funcutil" ) -const ( - queryNodeSegmentMetaPrefix = "queryNode-segmentMeta" -) - // segmentLoader is only responsible for loading the field data from binlog type segmentLoader struct { historicalReplica ReplicaInterface diff --git a/internal/util/constant.go b/internal/util/constant.go new file mode 100644 index 0000000000..7fb19286b7 --- /dev/null +++ b/internal/util/constant.go @@ -0,0 +1,6 @@ +package util + +const ( + SegmentMetaPrefix = "queryCoord-segmentMeta" + ChangeInfoMetaPrefix = "queryCoord-sealedSegmentChangeInfo" +)