fix: Skip running scalar index when segment was compacted (#44690)

issue: #44689

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-10-11 12:05:56 +08:00 committed by GitHub
parent 7a93cfe890
commit d8beafd6d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 15 additions and 0 deletions

View File

@ -871,6 +871,7 @@ func (m *indexMeta) UpdateIndexState(buildID UniqueID, state commonpb.IndexState
segIdx, ok := m.segmentBuildInfo.Get(buildID) segIdx, ok := m.segmentBuildInfo.Get(buildID)
if !ok { if !ok {
log.Ctx(m.ctx).Warn("there is no index with buildID", zap.Int64("buildID", buildID))
return fmt.Errorf("there is no index with buildID: %d", buildID) return fmt.Errorf("there is no index with buildID: %d", buildID)
} }
@ -881,6 +882,7 @@ func (m *indexMeta) UpdateIndexState(buildID UniqueID, state commonpb.IndexState
} }
if err := m.updateSegIndexMeta(segIdx, updateFunc); err != nil { if err := m.updateSegIndexMeta(segIdx, updateFunc); err != nil {
log.Ctx(m.ctx).Warn("failed to update index meta", zap.Int64("buildID", buildID), zap.Error(err))
return err return err
} }

View File

@ -210,6 +210,8 @@ func (s *globalTaskScheduler) check() {
defer s.mu.RUnlock(task.GetTaskID()) defer s.mu.RUnlock(task.GetTaskID())
task.QueryTaskOnWorker(s.cluster) task.QueryTaskOnWorker(s.cluster)
switch task.GetTaskState() { switch task.GetTaskState() {
case taskcommon.None:
s.runningTasks.Remove(task.GetTaskID())
case taskcommon.Init, taskcommon.Retry: case taskcommon.Init, taskcommon.Retry:
s.runningTasks.Remove(task.GetTaskID()) s.runningTasks.Remove(task.GetTaskID())
s.pendingTasks.Push(task) s.pendingTasks.Push(task)

View File

@ -369,6 +369,17 @@ func (it *indexBuildTask) prepareOptionalFields(ctx context.Context, collectionI
func (it *indexBuildTask) QueryTaskOnWorker(cluster session.Cluster) { func (it *indexBuildTask) QueryTaskOnWorker(cluster session.Cluster) {
log := log.Ctx(context.TODO()).With(zap.Int64("taskID", it.BuildID), zap.Int64("segmentID", it.SegmentID), zap.Int64("nodeID", it.NodeID)) log := log.Ctx(context.TODO()).With(zap.Int64("taskID", it.BuildID), zap.Int64("segmentID", it.SegmentID), zap.Int64("nodeID", it.NodeID))
// Check if task exists in meta
segIndex, exist := it.meta.indexMeta.GetIndexJob(it.BuildID)
if !exist || segIndex == nil {
log.Info("index task has not exist in meta table, removing task")
if it.tryDropTaskOnWorker(cluster) != nil {
return
}
it.SetState(indexpb.JobState_JobStateNone, "index task has not exist in meta table")
return
}
results, err := cluster.QueryIndex(it.NodeID, &workerpb.QueryJobsRequest{ results, err := cluster.QueryIndex(it.NodeID, &workerpb.QueryJobsRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
TaskIDs: []UniqueID{it.BuildID}, TaskIDs: []UniqueID{it.BuildID},