fix: [2.4] Remove task from syncmgr after task done (#33303)

Cherry-pick from master
pr: #33302
See also #33247
Introduced in PR #32865

Remove task after task done to keep checkpoint sound and safe

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-05-23 14:51:39 +08:00 committed by GitHub
parent ad4c1975bd
commit 2f3b377479
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -121,7 +121,6 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[st
func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[struct{}] {
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())
mgr.tasks.Insert(taskKey, task)
defer mgr.tasks.Remove(taskKey)
key, err := task.CalcTargetSegment()
if err != nil {
@ -133,6 +132,7 @@ func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[struct{}] {
}
func (mgr *syncManager) submit(key int64, task Task) *conc.Future[struct{}] {
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())
handler := func(err error) error {
if err == nil {
return nil
@ -161,7 +161,10 @@ func (mgr *syncManager) submit(key int64, task Task) *conc.Future[struct{}] {
return mgr.submit(targetID, task).Err()
}
log.Info("sync mgr sumbit task with key", zap.Int64("key", key))
return mgr.Submit(key, task, handler)
return mgr.Submit(key, task, handler, func(err error) error {
mgr.tasks.Remove(taskKey)
return err
})
}
func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {