diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 7daf4fadae..ded6c89224 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -128,12 +128,17 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica ret := make([]task.Task, 0) // compare with targets to find the lack and redundancy of segments - lacks, loadPriorities, redundancies := c.getSealedSegmentDiff(ctx, replica.GetCollectionID(), replica.GetID()) + lacks, loadPriorities, redundancies, toUpdate := c.getSealedSegmentDiff(ctx, replica.GetCollectionID(), replica.GetID()) tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, loadPriorities, replica) task.SetReason("lacks of segment", tasks...) task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) + tasks = c.createSegmentReopenTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), toUpdate, replica) + task.SetReason("segment updated", tasks...) + task.SetPriority(task.TaskPriorityNormal, tasks...) + ret = append(ret, tasks...) + redundancies = c.filterOutSegmentInUse(ctx, replica, redundancies) tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical) task.SetReason("segment not exists in target", tasks...) @@ -225,7 +230,7 @@ func (c *SegmentChecker) getSealedSegmentDiff( ctx context.Context, collectionID int64, replicaID int64, -) (toLoad []*datapb.SegmentInfo, loadPriorities []commonpb.LoadPriority, toRelease []*meta.Segment) { +) (toLoad []*datapb.SegmentInfo, loadPriorities []commonpb.LoadPriority, toRelease []*meta.Segment, toUpdate []*meta.Segment) { replica := c.meta.Get(ctx, replicaID) if replica == nil { log.Info("replica does not exist, skip it") @@ -235,15 +240,19 @@ func (c *SegmentChecker) getSealedSegmentDiff( sort.Slice(dist, func(i, j int) bool { return dist[i].Version < dist[j].Version }) - distMap := make(map[int64]int64) + distMap := make(map[int64]*meta.Segment) for _, s := range dist { - distMap[s.GetID()] = s.Node + distMap[s.GetID()] = s } isSegmentLack := func(segment *datapb.SegmentInfo) bool { _, existInDist := distMap[segment.ID] return !existInDist } + isSegmentUpdate := func(segment *datapb.SegmentInfo) bool { + segInDist, existInDist := distMap[segment.ID] + return existInDist && segInDist.ManifestPath != segment.GetManifestPath() + } nextTargetExist := c.targetMgr.IsNextTargetExist(ctx, collectionID) nextTargetMap := c.targetMgr.GetSealedSegmentsByCollection(ctx, collectionID, meta.NextTarget) @@ -259,6 +268,9 @@ func (c *SegmentChecker) getSealedSegmentDiff( } toLoad = append(toLoad, segment) } + if isSegmentUpdate(segment) { + toUpdate = append(toUpdate, distMap[segment.GetID()]) + } } // get segment which exist on dist, but not on current target and next target @@ -413,6 +425,35 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] return balance.CreateSegmentTasksFromPlans(ctx, c.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), plans) } +func (c *SegmentChecker) createSegmentReopenTasks(ctx context.Context, segments []*meta.Segment, replica *meta.Replica) []task.Task { + ret := make([]task.Task, 0, len(segments)) + for _, s := range segments { + action := task.NewSegmentAction(s.Node, task.ActionTypeReopen, s.GetInsertChannel(), s.GetID()) + task, err := task.NewSegmentTask( + ctx, + Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), + c.ID(), + s.GetCollectionID(), + replica, + replica.LoadPriority(), + action, + ) + if err != nil { + log.Warn("create segment reopen task failed", + zap.Int64("collection", s.GetCollectionID()), + zap.Int64("replica", replica.GetID()), + zap.String("channel", s.GetInsertChannel()), + zap.Int64("from", s.Node), + zap.Error(err), + ) + continue + } + + ret = append(ret, task) + } + return ret +} + func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments []*meta.Segment, replica *meta.Replica, scope querypb.DataScope) []task.Task { ret := make([]task.Task, 0, len(segments)) for _, s := range segments { diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 8eecf73e8b..691d8b82f9 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -692,12 +692,13 @@ func (suite *SegmentCheckerTestSuite) TestLoadPriority() { suite.checker.targetMgr.UpdateCollectionNextTarget(ctx, collectionID) // test getSealedSegmentDiff - toLoad, loadPriorities, toRelease := suite.checker.getSealedSegmentDiff(ctx, collectionID, replicaID) + toLoad, loadPriorities, toRelease, toUpdate := suite.checker.getSealedSegmentDiff(ctx, collectionID, replicaID) // verify results suite.Equal(2, len(toLoad)) suite.Equal(2, len(loadPriorities)) suite.Equal(0, len(toRelease)) + suite.Equal(0, len(toUpdate)) // segment2 not in current target, should use replica's priority suite.True(segment2.GetID() == toLoad[0].GetID() || segment2.GetID() == toLoad[1].GetID()) @@ -713,11 +714,12 @@ func (suite *SegmentCheckerTestSuite) TestLoadPriority() { // update current target to include segment2 suite.checker.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID) // test again - toLoad, loadPriorities, toRelease = suite.checker.getSealedSegmentDiff(ctx, collectionID, replicaID) + toLoad, loadPriorities, toRelease, toUpdate = suite.checker.getSealedSegmentDiff(ctx, collectionID, replicaID) // verify results suite.Equal(0, len(toLoad)) suite.Equal(0, len(loadPriorities)) suite.Equal(0, len(toRelease)) + suite.Equal(0, len(toUpdate)) } func (suite *SegmentCheckerTestSuite) TestFilterOutExistedOnLeader() { diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index d2ea4f9bf4..37575ee63e 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -206,6 +206,7 @@ func (dh *distHandler) updateSegmentsDistribution(ctx context.Context, resp *que LastDeltaTimestamp: s.GetLastDeltaTimestamp(), IndexInfo: s.GetIndexInfo(), JSONStatsField: s.GetJsonStatsInfo(), + ManifestPath: s.GetManifestPath(), }) } diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 5899fd5882..5d92d2843f 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -126,6 +126,7 @@ type Segment struct { LastDeltaTimestamp uint64 // The timestamp of the last delta record IndexInfo map[int64]*querypb.FieldIndexInfo // index info of loaded segment, indexID -> FieldIndexInfo JSONStatsField map[int64]*querypb.JsonStatsInfo // json index info of loaded segment + ManifestPath string // current manifest path of loaded segment } func SegmentFromInfo(info *datapb.SegmentInfo) *Segment { diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index ff951ce438..fd4c464d64 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -344,6 +344,53 @@ func (suite *CollectionObserverSuite) TestObserve() { suite.dist.ChannelDistManager.Update(3, ch4, ch5) + // Add segment distribution for CheckSegmentDataReady + // Collection 100: segments 1, 2 + suite.dist.SegmentDistManager.Update(1, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "100-dmc0", + }, + Node: 1, + }) + suite.dist.SegmentDistManager.Update(2, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "100-dmc1", + }, + Node: 2, + }) + + // Collection 102: segments from 5 to 1003 (999 segments) + segments102 := make([]*meta.Segment, 0, len(suite.segments[102])) + for _, seg := range suite.segments[102] { + segments102 = append(segments102, &meta.Segment{ + SegmentInfo: seg, + Node: 3, + }) + } + suite.dist.SegmentDistManager.Update(3, segments102...) + + // Collection 103: segments from 2000 to 2009 (10 segments), distributed on node 2 and 3 + segments103Node2 := make([]*meta.Segment, 0, len(suite.segments[103])) + segments103Node3 := make([]*meta.Segment, 0, len(suite.segments[103])) + for _, seg := range suite.segments[103] { + segments103Node2 = append(segments103Node2, &meta.Segment{ + SegmentInfo: seg, + Node: 2, + }) + segments103Node3 = append(segments103Node3, &meta.Segment{ + SegmentInfo: seg, + Node: 3, + }) + } + suite.dist.SegmentDistManager.Update(2, segments103Node2...) + suite.dist.SegmentDistManager.Update(3, segments103Node3...) + suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe() suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() @@ -412,6 +459,27 @@ func (suite *CollectionObserverSuite) TestObservePartition() { }, }) + // Add segment distribution for CheckSegmentDataReady + // Collection 100, partition 10: segments 1, 2 + suite.dist.SegmentDistManager.Update(1, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "100-dmc0", + }, + Node: 1, + }) + suite.dist.SegmentDistManager.Update(2, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "100-dmc1", + }, + Node: 2, + }) + suite.Eventually(func() bool { return suite.isPartitionLoaded(suite.partitions[100][0]) }, timeout*2, timeout/10) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 1aa026400b..cd1c30e2ee 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -461,10 +461,14 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect readyDelegatorsInCollection = append(readyDelegatorsInCollection, readyDelegatorsInReplica...) } + // segment data satisfies next target spec + segmentDataReady := !paramtable.Get().QueryCoordCfg.UpdateTargetNeedSegmentDataReady.GetAsBool() || + utils.CheckSegmentDataReady(ctx, collectionID, ob.distMgr, ob.targetMgr, meta.NextTarget) == nil + syncSuccess := ob.syncNextTargetToDelegator(ctx, collectionID, readyDelegatorsInCollection, newVersion) syncedChannelNames := lo.Uniq(lo.Map(readyDelegatorsInCollection, func(ch *meta.DmChannel, _ int) string { return ch.ChannelName })) // only after all channel are synced, we can consider the current target is ready - return syncSuccess && lo.Every(syncedChannelNames, lo.Keys(channelNames)) + return syncSuccess && lo.Every(syncedChannelNames, lo.Keys(channelNames)) && segmentDataReady } // sync next target info to delegator as readable snapshot diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 5e0e71837d..1ceefbd17c 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -218,6 +218,28 @@ func (suite *TargetObserverSuite) TestIncrementalUpdate_WithNewSegment() { len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 }, 5*time.Second, 1*time.Second) + // Add initial segment distribution for CheckSegmentDataReady + suite.distMgr.SegmentDistManager.Update(2, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 11, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "channel-1", + }, + Node: 2, + }, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 12, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "channel-2", + }, + Node: 2, + }, + ) + // Manually set CurrentTarget to non-empty (simulating previous successful load) // This is the precondition for incremental updates to work suite.targetMgr.UpdateCollectionCurrentTarget(ctx, suite.collectionID) @@ -284,6 +306,36 @@ func (suite *TargetObserverSuite) TestIncrementalUpdate_WithNewSegment() { }, }, }) + // Add segments to SegmentDistManager for CheckSegmentDataReady + suite.distMgr.SegmentDistManager.Update(2, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 11, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "channel-1", + }, + Node: 2, + }, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 12, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "channel-2", + }, + Node: 2, + }, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 13, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "channel-1", + }, + Node: 2, + }, + ) suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() @@ -424,6 +476,7 @@ func TestShouldUpdateCurrentTarget_EmptyNextTarget(t *testing.T) { // Return empty channels to simulate empty next target targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(map[string]*meta.DmChannel{}).Maybe() + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(map[int64]*datapb.SegmentInfo{}).Maybe() result := observer.shouldUpdateCurrentTarget(ctx, collectionID) assert.False(t, result) @@ -482,6 +535,7 @@ func TestShouldUpdateCurrentTarget_ReplicaReadiness(t *testing.T) { targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(map[int64]*datapb.SegmentInfo{}).Maybe() broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() @@ -609,6 +663,8 @@ func TestShouldUpdateCurrentTarget_OnlyReadyDelegatorsSynced(t *testing.T) { targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + // Return segments for CheckSegmentDataReady + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(targetSegments).Maybe() broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() @@ -639,6 +695,15 @@ func TestShouldUpdateCurrentTarget_OnlyReadyDelegatorsSynced(t *testing.T) { Status: &querypb.LeaderViewStatus{Serviceable: true}, }, }) + // Add segment to SegmentDistManager for CheckSegmentDataReady + distMgr.SegmentDistManager.Update(1, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + InsertChannel: "channel-1", + }, + Node: 1, + }) // Node 2: NOT READY delegator // - Does NOT have the target segment loaded (missing segment 100) @@ -733,6 +798,12 @@ func TestShouldUpdateCurrentTarget_AllChannelsSynced(t *testing.T) { segmentID2: {ID: segmentID2, CollectionID: collectionID, InsertChannel: "channel-2"}, } + // All segments for CheckSegmentDataReady + allSegments := map[int64]*datapb.SegmentInfo{ + segmentID1: {ID: segmentID1, CollectionID: collectionID, InsertChannel: "channel-1"}, + segmentID2: {ID: segmentID2, CollectionID: collectionID, InsertChannel: "channel-2"}, + } + targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(targetSegments1).Maybe() @@ -741,6 +812,8 @@ func TestShouldUpdateCurrentTarget_AllChannelsSynced(t *testing.T) { targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + // Return segments for CheckSegmentDataReady + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(allSegments).Maybe() broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() @@ -777,6 +850,25 @@ func TestShouldUpdateCurrentTarget_AllChannelsSynced(t *testing.T) { Status: &querypb.LeaderViewStatus{Serviceable: true}, }, }) + // Add segments to SegmentDistManager for CheckSegmentDataReady + distMgr.SegmentDistManager.Update(1, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segmentID1, + CollectionID: collectionID, + InsertChannel: "channel-1", + }, + Node: 1, + }, + &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segmentID2, + CollectionID: collectionID, + InsertChannel: "channel-2", + }, + Node: 1, + }, + ) // Execute the function under test result := observer.shouldUpdateCurrentTarget(ctx, collectionID) @@ -845,6 +937,12 @@ func TestShouldUpdateCurrentTarget_PartialChannelsSynced(t *testing.T) { segmentID2: {ID: segmentID2, CollectionID: collectionID, InsertChannel: "channel-2"}, } + // All segments for CheckSegmentDataReady + allSegments := map[int64]*datapb.SegmentInfo{ + segmentID1: {ID: segmentID1, CollectionID: collectionID, InsertChannel: "channel-1"}, + segmentID2: {ID: segmentID2, CollectionID: collectionID, InsertChannel: "channel-2"}, + } + targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(targetSegments1).Maybe() @@ -853,6 +951,8 @@ func TestShouldUpdateCurrentTarget_PartialChannelsSynced(t *testing.T) { targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + // Return segments for CheckSegmentDataReady - this will fail since segment distribution is incomplete + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(allSegments).Maybe() broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() @@ -959,6 +1059,8 @@ func TestShouldUpdateCurrentTarget_NoReadyDelegators(t *testing.T) { targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + // Return segments for CheckSegmentDataReady - this will fail since no segment in distribution + targetMgr.EXPECT().GetSealedSegmentsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(targetSegments).Maybe() broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 5c518e3ec7..a2a4f897e7 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -35,6 +35,7 @@ const ( ActionTypeUpdate ActionTypeStatsUpdate ActionTypeDropIndex + ActionTypeReopen ) var ActionTypeName = map[ActionType]string{ @@ -42,6 +43,8 @@ var ActionTypeName = map[ActionType]string{ ActionTypeReduce: "Reduce", ActionTypeUpdate: "Update", ActionTypeStatsUpdate: "StatsUpdate", + ActionTypeDropIndex: "DropIndex", + ActionTypeReopen: "Reopen", } func (t ActionType) String() string { diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index cb91967dcc..b8f07405e8 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -141,6 +141,10 @@ func packLoadSegmentRequest( loadScope = querypb.LoadScope_Stats } + if action.Type() == ActionTypeReopen { + loadScope = querypb.LoadScope_Reopen + } + if task.Source() == utils.LeaderChecker { loadScope = querypb.LoadScope_Delta } diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 46e13c700a..30dd6feb47 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -79,6 +79,33 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target return nil } +func CheckSegmentDataReady(ctx context.Context, collectionID int64, distManager *meta.DistributionManager, targetMgr meta.TargetManagerInterface, scope int32) error { + log := log.Ctx(ctx). + WithRateGroup(fmt.Sprintf("util.CheckSegmentDataReady-%d", collectionID), 1, 60). + With(zap.Int64("collectionID", collectionID)) + + // Check whether segments are fully loaded + segmentDist := targetMgr.GetSealedSegmentsByCollection(ctx, collectionID, scope) + for segmentID, segmentInfo := range segmentDist { + segments := distManager.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithSegmentID(segmentID)) + if len(segments) == 0 { + log.RatedInfo(10, "segment is not available", zap.Int64("segmentID", segmentID)) + return merr.WrapErrSegmentLack(segmentID) + } + + for _, segment := range segments { + // Compare manifest path for now + // alternative is to compare version, but it's not recommended to add extra info in segmentinfo + // we may use data view version in the future + if segment.ManifestPath != segmentInfo.GetManifestPath() { + log.RatedInfo(10, "segment is not updated", zap.Int64("segmentID", segmentID)) + return merr.WrapErrSegmentNotLoaded(segmentID) + } + } + } + return nil +} + func checkLoadStatus(ctx context.Context, m *meta.Meta, collectionID int64) error { percentage := m.CollectionManager.CalculateLoadPercentage(ctx, collectionID) if percentage < 0 { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 10209bbdea..09eac5ff6e 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2545,6 +2545,8 @@ type queryCoordConfig struct { ResourceExhaustionCleanupInterval ParamItem `refreshable:"true"` FileResourceMode ParamItem `refreshable:"false"` + + UpdateTargetNeedSegmentDataReady ParamItem `refreshable:"true"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -3212,6 +3214,15 @@ Set to 0 to disable the penalty period.`, Export: true, } p.ResourceExhaustionCleanupInterval.Init(base.mgr) + + p.UpdateTargetNeedSegmentDataReady = ParamItem{ + Key: "queryCoord.updateTargetNeedSegmentDataReady", + Version: "2.6.8", + DefaultValue: "true", + Doc: "update target need segment data ready", + Export: false, + } + p.UpdateTargetNeedSegmentDataReady.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////