diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 17cdfc6dd6..13c340518c 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -321,7 +321,7 @@ func (sd *shardDelegator) search(ctx context.Context, req *querypb.SearchRequest log.Warn("failed to optimize search params", zap.Error(err)) return nil, err } - tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, sd.modifySearchRequest) + tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifySearchRequest) if err != nil { log.Warn("Search organizeSubTask failed", zap.Error(err)) return nil, err @@ -508,7 +508,7 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq zap.Int("sealedNum", len(sealed)), zap.Int("growingNum", len(growing)), ) - tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, sd.modifyQueryRequest) + tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifyQueryRequest) if err != nil { log.Warn("query organizeSubTask failed", zap.Error(err)) return err @@ -588,7 +588,7 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) zap.Int("sealedNum", sealedNum), zap.Int("growingNum", len(growing)), ) - tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, sd.modifyQueryRequest) + tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifyQueryRequest) if err != nil { log.Warn("query organizeSubTask failed", zap.Error(err)) return nil, err @@ -636,7 +636,7 @@ func (sd *shardDelegator) GetStatistics(ctx context.Context, req *querypb.GetSta } defer sd.distribution.Unpin(version) - tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, func(req *querypb.GetStatisticsRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.GetStatisticsRequest { + tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, func(req *querypb.GetStatisticsRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.GetStatisticsRequest { nodeReq := proto.Clone(req).(*querypb.GetStatisticsRequest) nodeReq.GetReq().GetBase().TargetID = targetID nodeReq.Scope = scope @@ -670,7 +670,14 @@ type subTask[T any] struct { worker cluster.Worker } -func organizeSubTask[T any](ctx context.Context, req T, sealed []SnapshotItem, growing []SegmentEntry, sd *shardDelegator, modify func(T, querypb.DataScope, []int64, int64) T) ([]subTask[T], error) { +func organizeSubTask[T any](ctx context.Context, + req T, + sealed []SnapshotItem, + growing []SegmentEntry, + sd *shardDelegator, + skipEmpty bool, + modify func(T, querypb.DataScope, []int64, int64) T, +) ([]subTask[T], error) { log := sd.getLogger(ctx) result := make([]subTask[T], 0, len(sealed)+1) @@ -678,7 +685,7 @@ func organizeSubTask[T any](ctx context.Context, req T, sealed []SnapshotItem, g segmentIDs := lo.Map(segments, func(item SegmentEntry, _ int) int64 { return item.SegmentID }) - if len(segmentIDs) == 0 { + if skipEmpty && len(segmentIDs) == 0 { return nil } // update request @@ -874,6 +881,11 @@ func (sd *shardDelegator) UpdateSchema(ctx context.Context, schema *schemapb.Col } defer sd.distribution.Unpin(version) + log.Info("update schema targets...", + zap.Int("sealedNum", len(sealed)), + zap.Int("growingNum", len(growing)), + ) + tasks, err := organizeSubTask(ctx, &querypb.UpdateSchemaRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithSourceID(paramtable.GetNodeID()), @@ -881,11 +893,16 @@ func (sd *shardDelegator) UpdateSchema(ctx context.Context, schema *schemapb.Col CollectionID: sd.collectionID, Schema: schema, Version: schVersion, - }, sealed, growing, sd, func(req *querypb.UpdateSchemaRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.UpdateSchemaRequest { - nodeReq := typeutil.Clone(req) - nodeReq.GetBase().TargetID = targetID - return nodeReq - }) + }, + sealed, + growing, + sd, + false, // don't skip empty + func(req *querypb.UpdateSchemaRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.UpdateSchemaRequest { + nodeReq := typeutil.Clone(req) + nodeReq.GetBase().TargetID = targetID + return nodeReq + }) if err != nil { return err } @@ -915,7 +932,7 @@ func (sd *shardDelegator) Close() { // clean up l0 segment in delete buffer start := time.Now() sd.deleteBuffer.Clear() - log.Info("unregister all l0 segments", zap.Duration("cost", time.Since(start))) + log.Info("unregister all l0 segments", zap.Duration("cost", time.Since(start))) metrics.QueryNodeDeleteBufferSize.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName) metrics.QueryNodeDeleteBufferRowNum.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName)