fix: Set task version for stats task (#40035)

issue: #40034

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-02-27 17:49:59 +08:00 committed by GitHub
parent eb04686348
commit a74580c1ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 16 additions and 6 deletions

View File

@ -863,8 +863,8 @@ func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) {
log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID()))
// clear low version task
for i := int64(1); i < fieldStats.GetVersion(); i++ {
prefix := fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d", gc.option.cli.RootPath(), common.TextIndexPath,
seg.GetCollectionID(), seg.GetPartitionID(), seg.GetID(), fieldStats.GetFieldID(), i)
prefix := fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d/%d", gc.option.cli.RootPath(), common.TextIndexPath,
fieldStats.GetBuildID(), i, seg.GetCollectionID(), seg.GetPartitionID(), seg.GetID(), fieldStats.GetFieldID())
futures := make([]*conc.Future[struct{}], 0)
err := gc.option.cli.WalkWithPrefix(ctx, prefix, true, func(files *storage.ChunkObjectInfo) bool {

View File

@ -157,7 +157,7 @@ func (jm *statsJobManager) triggerTextStatsTask() {
needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID())
}
segments := jm.mt.SelectSegments(jm.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool {
return needDoTextIndex(seg, needTriggerFieldIDs)
return seg.GetIsSorted() && needDoTextIndex(seg, needTriggerFieldIDs)
}))
for _, segment := range segments {
@ -181,7 +181,7 @@ func (jm *statsJobManager) triggerBM25StatsTask() {
}
}
segments := jm.mt.SelectSegments(jm.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool {
return needDoBM25(seg, needTriggerFieldIDs)
return seg.GetIsSorted() && needDoBM25(seg, needTriggerFieldIDs)
}))
for _, segment := range segments {

View File

@ -160,8 +160,15 @@ func (st *statsTask) UpdateMetaBuildingState(meta *meta) error {
}
func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool {
// set segment compacting
log := log.Ctx(ctx).With(zap.Int64("taskID", st.taskID), zap.Int64("segmentID", st.segmentID))
statsMeta := dependency.meta.statsTaskMeta.GetStatsTaskBySegmentID(st.segmentID, st.subJobType)
if statsMeta == nil {
log.Warn("stats task meta is null, skip it")
st.SetState(indexpb.JobState_JobStateNone, "stats task meta is null")
return false
}
// set segment compacting
segment := dependency.meta.GetHealthySegment(ctx, st.segmentID)
if segment == nil {
log.Warn("segment is node healthy, skip stats")
@ -215,7 +222,9 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo
NumRows: segment.GetNumOfRows(),
CollectionTtl: collTtl.Nanoseconds(),
CurrentTs: tsoutil.GetCurrentTime(),
BinlogMaxSize: Params.DataNodeCfg.BinLogMaxSize.GetAsUint64(),
// update version after check
TaskVersion: statsMeta.GetVersion() + 1,
BinlogMaxSize: Params.DataNodeCfg.BinLogMaxSize.GetAsUint64(),
}
return true

View File

@ -119,6 +119,7 @@ func (s *statsTaskSuite) SetupSuite() {
SegmentID: s.segID,
InsertChannel: "ch1",
TaskID: s.taskID,
SubJobType: indexpb.StatsSubJob_Sort,
Version: 0,
NodeID: 0,
State: indexpb.JobState_JobStateInit,