diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 8332ef580e..2c74bd5e4b 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math/rand" + "runtime" "sort" "time" @@ -44,6 +45,7 @@ import ( "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -270,50 +272,66 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { // applyDelete handles delete record and apply them to corresponding workers. func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry, scope querypb.DataScope) []int64 { - var offlineSegments []int64 + offlineSegments := typeutil.NewConcurrentSet[int64]() log := sd.getLogger(ctx) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0) * 4) + defer pool.Release() + + var futures []*conc.Future[struct{}] for _, segmentEntry := range entries { log := log.With( zap.Int64("segmentID", segmentEntry.SegmentID), zap.Int64("workerID", nodeID), ) + segmentEntry := segmentEntry delRecord, ok := delRecords[segmentEntry.SegmentID] if ok { - log.Debug("delegator plan to applyDelete via worker") - err := retry.Handle(ctx, func() (bool, error) { - if sd.Stopped() { - return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing") - } + future := pool.Submit(func() (struct{}, error) { + log.Debug("delegator plan to applyDelete via worker") + err := retry.Handle(ctx, func() (bool, error) { + if sd.Stopped() { + return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing") + } - err := worker.Delete(ctx, &querypb.DeleteRequest{ - Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)), - CollectionId: sd.collectionID, - PartitionId: segmentEntry.PartitionID, - VchannelName: sd.vchannelName, - SegmentId: segmentEntry.SegmentID, - PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys), - Timestamps: delRecord.Timestamps, - Scope: scope, - }) - if errors.Is(err, merr.ErrNodeNotFound) { - log.Warn("try to delete data on non-exist node") - return false, err - } else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) { - log.Warn("try to delete data of released segment") + err := worker.Delete(ctx, &querypb.DeleteRequest{ + Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)), + CollectionId: sd.collectionID, + PartitionId: segmentEntry.PartitionID, + VchannelName: sd.vchannelName, + SegmentId: segmentEntry.SegmentID, + PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys), + Timestamps: delRecord.Timestamps, + Scope: scope, + }) + if errors.Is(err, merr.ErrNodeNotFound) { + log.Warn("try to delete data on non-exist node") + // cancel other request + cancel() + return false, err + } else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) { + log.Warn("try to delete data of released segment") + return false, nil + } else if err != nil { + log.Warn("worker failed to delete on segment", zap.Error(err)) + return true, err + } return false, nil - } else if err != nil { - log.Warn("worker failed to delete on segment", zap.Error(err)) - return true, err + }, retry.Attempts(10)) + if err != nil { + log.Warn("apply delete for segment failed, marking it offline") + offlineSegments.Insert(segmentEntry.SegmentID) } - return false, nil - }, retry.Attempts(10)) - if err != nil { - log.Warn("apply delete for segment failed, marking it offline") - offlineSegments = append(offlineSegments, segmentEntry.SegmentID) - } + return struct{}{}, err + }) + futures = append(futures, future) } } - return offlineSegments + conc.AwaitAll(futures...) + return offlineSegments.Collect() } // markSegmentOffline makes segment go offline and waits for QueryCoord to fix.