diff --git a/internal/datanode/syncmgr/sync_manager.go b/internal/datanode/syncmgr/sync_manager.go index 1f4534c52f..190a8e7655 100644 --- a/internal/datanode/syncmgr/sync_manager.go +++ b/internal/datanode/syncmgr/sync_manager.go @@ -121,7 +121,6 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[st func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[struct{}] { taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp()) mgr.tasks.Insert(taskKey, task) - defer mgr.tasks.Remove(taskKey) key, err := task.CalcTargetSegment() if err != nil { @@ -133,6 +132,7 @@ func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[struct{}] { } func (mgr *syncManager) submit(key int64, task Task) *conc.Future[struct{}] { + taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp()) handler := func(err error) error { if err == nil { return nil @@ -161,7 +161,10 @@ func (mgr *syncManager) submit(key int64, task Task) *conc.Future[struct{}] { return mgr.submit(targetID, task).Err() } log.Info("sync mgr sumbit task with key", zap.Int64("key", key)) - return mgr.Submit(key, task, handler) + return mgr.Submit(key, task, handler, func(err error) error { + mgr.tasks.Remove(taskKey) + return err + }) } func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {