From d8beafd6d0609b7d0a690fbc87552ef4d45a7666 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Sat, 11 Oct 2025 12:05:56 +0800 Subject: [PATCH] fix: Skip running scalar index when segment was compacted (#44690) issue: #44689 Signed-off-by: Cai Zhang --- internal/datacoord/index_meta.go | 2 ++ internal/datacoord/task/global_scheduler.go | 2 ++ internal/datacoord/task_index.go | 11 +++++++++++ 3 files changed, 15 insertions(+) diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 7beeb97a4e..20996229ec 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -871,6 +871,7 @@ func (m *indexMeta) UpdateIndexState(buildID UniqueID, state commonpb.IndexState segIdx, ok := m.segmentBuildInfo.Get(buildID) if !ok { + log.Ctx(m.ctx).Warn("there is no index with buildID", zap.Int64("buildID", buildID)) return fmt.Errorf("there is no index with buildID: %d", buildID) } @@ -881,6 +882,7 @@ func (m *indexMeta) UpdateIndexState(buildID UniqueID, state commonpb.IndexState } if err := m.updateSegIndexMeta(segIdx, updateFunc); err != nil { + log.Ctx(m.ctx).Warn("failed to update index meta", zap.Int64("buildID", buildID), zap.Error(err)) return err } diff --git a/internal/datacoord/task/global_scheduler.go b/internal/datacoord/task/global_scheduler.go index 708204418b..3749d0bdb1 100644 --- a/internal/datacoord/task/global_scheduler.go +++ b/internal/datacoord/task/global_scheduler.go @@ -210,6 +210,8 @@ func (s *globalTaskScheduler) check() { defer s.mu.RUnlock(task.GetTaskID()) task.QueryTaskOnWorker(s.cluster) switch task.GetTaskState() { + case taskcommon.None: + s.runningTasks.Remove(task.GetTaskID()) case taskcommon.Init, taskcommon.Retry: s.runningTasks.Remove(task.GetTaskID()) s.pendingTasks.Push(task) diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index 1541e353ca..ae03cca514 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -369,6 +369,17 @@ func (it *indexBuildTask) prepareOptionalFields(ctx context.Context, collectionI func (it *indexBuildTask) QueryTaskOnWorker(cluster session.Cluster) { log := log.Ctx(context.TODO()).With(zap.Int64("taskID", it.BuildID), zap.Int64("segmentID", it.SegmentID), zap.Int64("nodeID", it.NodeID)) + // Check if task exists in meta + segIndex, exist := it.meta.indexMeta.GetIndexJob(it.BuildID) + if !exist || segIndex == nil { + log.Info("index task has not exist in meta table, removing task") + if it.tryDropTaskOnWorker(cluster) != nil { + return + } + it.SetState(indexpb.JobState_JobStateNone, "index task has not exist in meta table") + return + } + results, err := cluster.QueryIndex(it.NodeID, &workerpb.QueryJobsRequest{ ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), TaskIDs: []UniqueID{it.BuildID},