enhance: Make applyDelete work in paralell in segment level (#32291)

`applyDelete` used to be serial for delete entries on each segments.
This PR make it work in parallel with errgroup to improve performance

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-04-24 17:01:24 +08:00 committed by GitHub
parent 37ca32dbba
commit faa559592d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math/rand"
"runtime"
"sort"
"time"
@ -44,6 +45,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -270,50 +272,66 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
// applyDelete handles delete record and apply them to corresponding workers.
func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry, scope querypb.DataScope) []int64 {
var offlineSegments []int64
offlineSegments := typeutil.NewConcurrentSet[int64]()
log := sd.getLogger(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0) * 4)
defer pool.Release()
var futures []*conc.Future[struct{}]
for _, segmentEntry := range entries {
log := log.With(
zap.Int64("segmentID", segmentEntry.SegmentID),
zap.Int64("workerID", nodeID),
)
segmentEntry := segmentEntry
delRecord, ok := delRecords[segmentEntry.SegmentID]
if ok {
log.Debug("delegator plan to applyDelete via worker")
err := retry.Handle(ctx, func() (bool, error) {
if sd.Stopped() {
return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing")
}
future := pool.Submit(func() (struct{}, error) {
log.Debug("delegator plan to applyDelete via worker")
err := retry.Handle(ctx, func() (bool, error) {
if sd.Stopped() {
return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing")
}
err := worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)),
CollectionId: sd.collectionID,
PartitionId: segmentEntry.PartitionID,
VchannelName: sd.vchannelName,
SegmentId: segmentEntry.SegmentID,
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys),
Timestamps: delRecord.Timestamps,
Scope: scope,
})
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("try to delete data on non-exist node")
return false, err
} else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) {
log.Warn("try to delete data of released segment")
err := worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)),
CollectionId: sd.collectionID,
PartitionId: segmentEntry.PartitionID,
VchannelName: sd.vchannelName,
SegmentId: segmentEntry.SegmentID,
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys),
Timestamps: delRecord.Timestamps,
Scope: scope,
})
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("try to delete data on non-exist node")
// cancel other request
cancel()
return false, err
} else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) {
log.Warn("try to delete data of released segment")
return false, nil
} else if err != nil {
log.Warn("worker failed to delete on segment", zap.Error(err))
return true, err
}
return false, nil
} else if err != nil {
log.Warn("worker failed to delete on segment", zap.Error(err))
return true, err
}, retry.Attempts(10))
if err != nil {
log.Warn("apply delete for segment failed, marking it offline")
offlineSegments.Insert(segmentEntry.SegmentID)
}
return false, nil
}, retry.Attempts(10))
if err != nil {
log.Warn("apply delete for segment failed, marking it offline")
offlineSegments = append(offlineSegments, segmentEntry.SegmentID)
}
return struct{}{}, err
})
futures = append(futures, future)
}
}
return offlineSegments
conc.AwaitAll(futures...)
return offlineSegments.Collect()
}
// markSegmentOffline makes segment go offline and waits for QueryCoord to fix.