From 1414065860f19fdaf4c5183b03b73c8e9686e3b2 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 17 Dec 2025 22:15:16 +0800 Subject: [PATCH] feat: query coord support segment reopen when manifest path changes (#46394) Related to #46358 Add segment reopen mechanism in QueryCoord to handle segment data updates when the manifest path changes. This enables QueryNode to reload segment data without full segment reload, supporting storage v2 incremental updates. Changes: - Add ActionTypeReopen action type and LoadScope_Reopen in protobuf - Track ManifestPath in segment distribution metadata - Add CheckSegmentDataReady utility to verify segment data matches target - Extend getSealedSegmentDiff to detect segments needing reopen - Create segment reopen tasks when manifest path differs from target - Block target update until segment data is ready --------- Signed-off-by: Congqi Xia --- .../querycoordv2/checkers/segment_checker.go | 49 ++++++++- .../checkers/segment_checker_test.go | 6 +- internal/querycoordv2/dist/dist_handler.go | 1 + .../querycoordv2/meta/segment_dist_manager.go | 1 + .../observers/collection_observer_test.go | 68 ++++++++++++ .../querycoordv2/observers/target_observer.go | 6 +- .../observers/target_observer_test.go | 102 ++++++++++++++++++ internal/querycoordv2/task/action.go | 3 + internal/querycoordv2/task/utils.go | 4 + internal/querycoordv2/utils/util.go | 27 +++++ pkg/util/paramtable/component_param.go | 11 ++ 11 files changed, 271 insertions(+), 7 deletions(-) diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 7daf4fadae..ded6c89224 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -128,12 +128,17 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica ret := make([]task.Task, 0) // compare with targets to find the lack and redundancy of segments - lacks, loadPriorities, redundancies := c.getSealedSegmentDiff(ctx, replica.GetCollectionID(), replica.GetID()) + lacks, loadPriorities, redundancies, toUpdate := c.getSealedSegmentDiff(ctx, replica.GetCollectionID(), replica.GetID()) tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, loadPriorities, replica) task.SetReason("lacks of segment", tasks...) task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) + tasks = c.createSegmentReopenTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), toUpdate, replica) + task.SetReason("segment updated", tasks...) + task.SetPriority(task.TaskPriorityNormal, tasks...) + ret = append(ret, tasks...) + redundancies = c.filterOutSegmentInUse(ctx, replica, redundancies) tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical) task.SetReason("segment not exists in target", tasks...) @@ -225,7 +230,7 @@ func (c *SegmentChecker) getSealedSegmentDiff( ctx context.Context, collectionID int64, replicaID int64, -) (toLoad []*datapb.SegmentInfo, loadPriorities []commonpb.LoadPriority, toRelease []*meta.Segment) { +) (toLoad []*datapb.SegmentInfo, loadPriorities []commonpb.LoadPriority, toRelease []*meta.Segment, toUpdate []*meta.Segment) { replica := c.meta.Get(ctx, replicaID) if replica == nil { log.Info("replica does not exist, skip it") @@ -235,15 +240,19 @@ func (c *SegmentChecker) getSealedSegmentDiff( sort.Slice(dist, func(i, j int) bool { return dist[i].Version < dist[j].Version }) - distMap := make(map[int64]int64) + distMap := make(map[int64]*meta.Segment) for _, s := range dist { - distMap[s.GetID()] = s.Node + distMap[s.GetID()] = s } isSegmentLack := func(segment *datapb.SegmentInfo) bool { _, existInDist := distMap[segment.ID] return !existInDist } + isSegmentUpdate := func(segment *datapb.SegmentInfo) bool { + segInDist, existInDist := distMap[segment.ID] + return existInDist && segInDist.ManifestPath != segment.GetManifestPath() + } nextTargetExist := c.targetMgr.IsNextTargetExist(ctx, collectionID) nextTargetMap := c.targetMgr.GetSealedSegmentsByCollection(ctx, collectionID, meta.NextTarget) @@ -259,6 +268,9 @@ func (c *SegmentChecker) getSealedSegmentDiff( } toLoad = append(toLoad, segment) } + if isSegmentUpdate(segment) { + toUpdate = append(toUpdate, distMap[segment.GetID()]) + } } // get segment which exist on dist, but not on current target and next target @@ -413,6 +425,35 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] return balance.CreateSegmentTasksFromPlans(ctx, c.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), plans) } +func (c *SegmentChecker) createSegmentReopenTasks(ctx context.Context, segments []*meta.Segment, replica *meta.Replica) []task.Task { + ret := make([]task.Task, 0, len(segments)) + for _, s := range segments { + action := task.NewSegmentAction(s.Node, task.ActionTypeReopen, s.GetInsertChannel(), s.GetID()) + task, err := task.NewSegmentTask( + ctx, + Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), + c.ID(), + s.GetCollectionID(), + replica, + replica.LoadPriority(), + action, + ) + if err != nil { + log.Warn("create segment reopen task failed", + zap.Int64("collection", s.GetCollectionID()), + zap.Int64("replica", replica.GetID()), + zap.String("channel", s.GetInsertChannel()), + zap.Int64("from", s.Node), + zap.Error(err), + ) + continue + } + + ret = append(ret, task) + } + return ret +} + func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments []*meta.Segment, replica *meta.Replica, scope querypb.DataScope) []task.Task { ret := make([]task.Task, 0, len(segments)) for _, s := range segments { diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 8eecf73e8b..691d8b82f9 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -692,12 +692,13 @@ func (suite *SegmentCheckerTestSuite) TestLoadPriority() { suite.checker.targetMgr.UpdateCollectionNextTarget(ctx, collectionID) // test getSealedSegmentDiff - toLoad, loadPriorities, toRelease := suite.checker.getSealedSegmentDiff(ctx, collectionID, replicaID) + toLoad, loadPriorities, toRelease, toUpdate := suite.checker.getSealedSegmentDiff(ctx, collectionID, replicaID) // verify results suite.Equal(2, len(toLoad)) suite.Equal(2, len(loadPriorities)) suite.Equal(0, len(toRelease)) + suite.Equal(0, len(toUpdate)) // segment2 not in current target, should use replica's priority suite.True(segment2.GetID() == toLoad[0].GetID() || segment2.GetID() == toLoad[1].GetID()) @@ -713,11 +714,12 @@ func (suite *SegmentCheckerTestSuite) TestLoadPriority() { // update current target to include segment2 suite.checker.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID) // test again - toLoad, loadPriorities, toRelease = suite.checker.getSealedSegmentDiff(ctx, collectionID, replicaID) + toLoad, loadPriorities, toRelease, toUpdate = suite.checker.getSealedSegmentDiff(ctx, collectionID, replicaID) // verify results suite.Equal(0, len(toLoad)) suite.Equal(0, len(loadPriorities)) suite.Equal(0, len(toRelease)) + suite.Equal(0, len(toUpdate)) } func (suite *SegmentCheckerTestSuite) TestFilterOutExistedOnLeader() { diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index d2ea4f9bf4..37575ee63e 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -206,6 +206,7 @@ func (dh *distHandler) updateSegmentsDistribution(ctx context.Context, resp *que LastDeltaTimestamp: s.GetLastDeltaTimestamp(), IndexInfo: s.GetIndexInfo(), JSONStatsField: s.GetJsonStatsInfo(), + ManifestPath: s.GetManifestPath(), }) } diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 5899fd5882..5d92d2843f 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -126,6 +126,7 @@ type Segment struct { LastDeltaTimestamp uint64 // The timestamp of the last delta record IndexInfo map[int64]*querypb.FieldIndexInfo // index info of loaded segment, indexID -> FieldIndexInfo JSONStatsField map[int64]*querypb.JsonStatsInfo // json index info of loaded segment + ManifestPath string // current manifest path of loaded segment } func SegmentFromInfo(info *datapb.SegmentInfo) *Segment { diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index ff951ce438..fd4c464d64 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -344,6 +344,53 @@ func (suite *CollectionObserverSuite) TestObserve() { suite.dist.ChannelDistManager.Update(3, ch4, ch5) + // Add segment distribution for CheckSegmentDataReady + // Collection 100: segments 1, 2 + suite.dist.SegmentDistManager.Update(1, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "100-dmc0", + }, + Node: 1, + }) + suite.dist.SegmentDistManager.Update(2, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "100-dmc1", + }, + Node: 2, + }) + + // Collection 102: segments from 5 to 1003 (999 segments) + segments102 := make([]*meta.Segment, 0, len(suite.segments[102])) + for _, seg := range suite.segments[102] { + segments102 = append(segments102, &meta.Segment{ + SegmentInfo: seg, + Node: 3, + }) + } + suite.dist.SegmentDistManager.Update(3, segments102...) + + // Collection 103: segments from 2000 to 2009 (10 segments), distributed on node 2 and 3 + segments103Node2 := make([]*meta.Segment, 0, len(suite.segments[103])) + segments103Node3 := make([]*meta.Segment, 0, len(suite.segments[103])) + for _, seg := range suite.segments[103] { + segments103Node2 = append(segments103Node2, &meta.Segment{ + SegmentInfo: seg, + Node: 2, + }) + segments103Node3 = append(segments103Node3, &meta.Segment{ + SegmentInfo: seg, + Node: 3, + }) + } + suite.dist.SegmentDistManager.Update(2, segments103Node2...) + suite.dist.SegmentDistManager.Update(3, segments103Node3...) + suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe() suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() @@ -412,6 +459,27 @@ func (suite *CollectionObserverSuite) TestObservePartition() { }, }) + // Add segment distribution for CheckSegmentDataReady + // Collection 100, partition 10: segments 1, 2 + suite.dist.SegmentDistManager.Update(1, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "100-dmc0", + }, + Node: 1, + }) + suite.dist.SegmentDistManager.Update(2, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "100-dmc1", + }, + Node: 2, + }) + suite.Eventually(func() bool { return suite.isPartitionLoaded(suite.partitions[100][0]) }, timeout*2, timeout/10) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 1aa026400b..cd1c30e2ee 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -461,10 +461,14 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect readyDelegatorsInCollection = append(readyDelegatorsInCollection, readyDelegatorsInReplica...) } + // segment data satisfies next target spec + segmentDataReady := !paramtable.Get().QueryCoordCfg.UpdateTargetNeedSegmentDataReady.GetAsBool() || + utils.CheckSegmentDataReady(ctx, collectionID, ob.distMgr, ob.targetMgr, meta.NextTarget) == nil + syncSuccess := ob.syncNextTargetToDelegator(ctx, collectionID, readyDelegatorsInCollection, newVersion) syncedChannelNames := lo.Uniq(lo.Map(readyDelegatorsInCollection, func(ch *meta.DmChannel, _ int) string { return ch.ChannelName })) // only after all channel are synced, we can consider the current target is ready - return syncSuccess && lo.Every(syncedChannelNames, lo.Keys(channelNames)) + return syncSuccess && lo.Every(syncedChannelNames, lo.Keys(channelNames)) && segmentDataReady } // sync next target info to delegator as readable snapshot diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 5e0e71837d..1ceefbd17c 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -218,6 +218,28 @@ func (suite *TargetObserverSuite) TestIncrementalUpdate_WithNewSegment() { len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 }, 5*time.Second, 1*time.Second) + // Add initial segment distribution for CheckSegmentDataReady + suite.distMgr.SegmentDistManager.Update(2, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 11, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "channel-1", + }, + Node: 2, + }, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 12, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "channel-2", + }, + Node: 2, + }, + ) + // Manually set CurrentTarget to non-empty (simulating previous successful load) // This is the precondition for incremental updates to work suite.targetMgr.UpdateCollectionCurrentTarget(ctx, suite.collectionID) @@ -284,6 +306,36 @@ func (suite *TargetObserverSuite) TestIncrementalUpdate_WithNewSegment() { }, }, }) + // Add segments to SegmentDistManager for CheckSegmentDataReady + suite.distMgr.SegmentDistManager.Update(2, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 11, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "channel-1", + }, + Node: 2, + }, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 12, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "channel-2", + }, + Node: 2, + }, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 13, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "channel-1", + }, + Node: 2, + }, + ) suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() @@ -424,6 +476,7 @@ func TestShouldUpdateCurrentTarget_EmptyNextTarget(t *testing.T) { // Return empty channels to simulate empty next target targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(map[string]*meta.DmChannel{}).Maybe() + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(map[int64]*datapb.SegmentInfo{}).Maybe() result := observer.shouldUpdateCurrentTarget(ctx, collectionID) assert.False(t, result) @@ -482,6 +535,7 @@ func TestShouldUpdateCurrentTarget_ReplicaReadiness(t *testing.T) { targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(map[int64]*datapb.SegmentInfo{}).Maybe() broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() @@ -609,6 +663,8 @@ func TestShouldUpdateCurrentTarget_OnlyReadyDelegatorsSynced(t *testing.T) { targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + // Return segments for CheckSegmentDataReady + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(targetSegments).Maybe() broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() @@ -639,6 +695,15 @@ func TestShouldUpdateCurrentTarget_OnlyReadyDelegatorsSynced(t *testing.T) { Status: &querypb.LeaderViewStatus{Serviceable: true}, }, }) + // Add segment to SegmentDistManager for CheckSegmentDataReady + distMgr.SegmentDistManager.Update(1, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + InsertChannel: "channel-1", + }, + Node: 1, + }) // Node 2: NOT READY delegator // - Does NOT have the target segment loaded (missing segment 100) @@ -733,6 +798,12 @@ func TestShouldUpdateCurrentTarget_AllChannelsSynced(t *testing.T) { segmentID2: {ID: segmentID2, CollectionID: collectionID, InsertChannel: "channel-2"}, } + // All segments for CheckSegmentDataReady + allSegments := map[int64]*datapb.SegmentInfo{ + segmentID1: {ID: segmentID1, CollectionID: collectionID, InsertChannel: "channel-1"}, + segmentID2: {ID: segmentID2, CollectionID: collectionID, InsertChannel: "channel-2"}, + } + targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(targetSegments1).Maybe() @@ -741,6 +812,8 @@ func TestShouldUpdateCurrentTarget_AllChannelsSynced(t *testing.T) { targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + // Return segments for CheckSegmentDataReady + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(allSegments).Maybe() broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() @@ -777,6 +850,25 @@ func TestShouldUpdateCurrentTarget_AllChannelsSynced(t *testing.T) { Status: &querypb.LeaderViewStatus{Serviceable: true}, }, }) + // Add segments to SegmentDistManager for CheckSegmentDataReady + distMgr.SegmentDistManager.Update(1, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segmentID1, + CollectionID: collectionID, + InsertChannel: "channel-1", + }, + Node: 1, + }, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segmentID2, + CollectionID: collectionID, + InsertChannel: "channel-2", + }, + Node: 1, + }, + ) // Execute the function under test result := observer.shouldUpdateCurrentTarget(ctx, collectionID) @@ -845,6 +937,12 @@ func TestShouldUpdateCurrentTarget_PartialChannelsSynced(t *testing.T) { segmentID2: {ID: segmentID2, CollectionID: collectionID, InsertChannel: "channel-2"}, } + // All segments for CheckSegmentDataReady + allSegments := map[int64]*datapb.SegmentInfo{ + segmentID1: {ID: segmentID1, CollectionID: collectionID, InsertChannel: "channel-1"}, + segmentID2: {ID: segmentID2, CollectionID: collectionID, InsertChannel: "channel-2"}, + } + targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(targetSegments1).Maybe() @@ -853,6 +951,8 @@ func TestShouldUpdateCurrentTarget_PartialChannelsSynced(t *testing.T) { targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + // Return segments for CheckSegmentDataReady - this will fail since segment distribution is incomplete + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(allSegments).Maybe() broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() @@ -959,6 +1059,8 @@ func TestShouldUpdateCurrentTarget_NoReadyDelegators(t *testing.T) { targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + // Return segments for CheckSegmentDataReady - this will fail since no segment in distribution + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(targetSegments).Maybe() broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 5c518e3ec7..a2a4f897e7 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -35,6 +35,7 @@ const ( ActionTypeUpdate ActionTypeStatsUpdate ActionTypeDropIndex + ActionTypeReopen ) var ActionTypeName = map[ActionType]string{ @@ -42,6 +43,8 @@ var ActionTypeName = map[ActionType]string{ ActionTypeReduce: "Reduce", ActionTypeUpdate: "Update", ActionTypeStatsUpdate: "StatsUpdate", + ActionTypeDropIndex: "DropIndex", + ActionTypeReopen: "Reopen", } func (t ActionType) String() string { diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index cb91967dcc..b8f07405e8 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -141,6 +141,10 @@ func packLoadSegmentRequest( loadScope = querypb.LoadScope_Stats } + if action.Type() == ActionTypeReopen { + loadScope = querypb.LoadScope_Reopen + } + if task.Source() == utils.LeaderChecker { loadScope = querypb.LoadScope_Delta } diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 46e13c700a..30dd6feb47 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -79,6 +79,33 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target return nil } +func CheckSegmentDataReady(ctx context.Context, collectionID int64, distManager *meta.DistributionManager, targetMgr meta.TargetManagerInterface, scope int32) error { + log := log.Ctx(ctx). + WithRateGroup(fmt.Sprintf("util.CheckSegmentDataReady-%d", collectionID), 1, 60). + With(zap.Int64("collectionID", collectionID)) + + // Check whether segments are fully loaded + segmentDist := targetMgr.GetSealedSegmentsByCollection(ctx, collectionID, scope) + for segmentID, segmentInfo := range segmentDist { + segments := distManager.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithSegmentID(segmentID)) + if len(segments) == 0 { + log.RatedInfo(10, "segment is not available", zap.Int64("segmentID", segmentID)) + return merr.WrapErrSegmentLack(segmentID) + } + + for _, segment := range segments { + // Compare manifest path for now + // alternative is to compare version, but it's not recommended to add extra info in segmentinfo + // we may use data view version in the future + if segment.ManifestPath != segmentInfo.GetManifestPath() { + log.RatedInfo(10, "segment is not updated", zap.Int64("segmentID", segmentID)) + return merr.WrapErrSegmentNotLoaded(segmentID) + } + } + } + return nil +} + func checkLoadStatus(ctx context.Context, m *meta.Meta, collectionID int64) error { percentage := m.CollectionManager.CalculateLoadPercentage(ctx, collectionID) if percentage < 0 { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 10209bbdea..09eac5ff6e 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2545,6 +2545,8 @@ type queryCoordConfig struct { ResourceExhaustionCleanupInterval ParamItem `refreshable:"true"` FileResourceMode ParamItem `refreshable:"false"` + + UpdateTargetNeedSegmentDataReady ParamItem `refreshable:"true"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -3212,6 +3214,15 @@ Set to 0 to disable the penalty period.`, Export: true, } p.ResourceExhaustionCleanupInterval.Init(base.mgr) + + p.UpdateTargetNeedSegmentDataReady = ParamItem{ + Key: "queryCoord.updateTargetNeedSegmentDataReady", + Version: "2.6.8", + DefaultValue: "true", + Doc: "update target need segment data ready", + Export: false, + } + p.UpdateTargetNeedSegmentDataReady.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////