fix: Re add json stats trigger (#41967)

issue: #41123

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-05-23 10:02:27 +08:00 committed by GitHub
parent eea6b50fbb
commit 48419da4d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 57 additions and 0 deletions

View File

@ -125,6 +125,9 @@ func (si *statsInspector) triggerStatsTaskLoop() {
ticker := time.NewTicker(Params.DataCoordCfg.TaskCheckInterval.GetAsDuration(time.Second))
defer ticker.Stop()
lastJSONStatsLastTrigger := time.Now().Unix()
maxJSONStatsTaskCount := 0
for {
select {
case <-si.ctx.Done():
@ -134,6 +137,7 @@ func (si *statsInspector) triggerStatsTaskLoop() {
si.triggerSortStatsTask()
si.triggerTextStatsTask()
si.triggerBM25StatsTask()
lastJSONStatsLastTrigger, maxJSONStatsTaskCount = si.triggerJsonKeyIndexStatsTask(lastJSONStatsLastTrigger, maxJSONStatsTaskCount)
case segID := <-getStatsTaskChSingleton():
log.Info("receive new segment to trigger stats task", zap.Int64("segmentID", segID))
@ -204,6 +208,20 @@ func needDoTextIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool {
return false
}
func needDoJsonKeyIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool {
if !(isFlush(segment) && segment.GetLevel() != datapb.SegmentLevel_L0 &&
segment.GetIsSorted()) {
return false
}
for _, fieldID := range fieldIDs {
if segment.GetJsonKeyStats()[fieldID] == nil {
return true
}
}
return false
}
func needDoBM25(segment *SegmentInfo, fieldIDs []UniqueID) bool {
// TODO: docking bm25 stats task
return false
@ -235,6 +253,38 @@ func (si *statsInspector) triggerTextStatsTask() {
}
}
func (si *statsInspector) triggerJsonKeyIndexStatsTask(lastJSONStatsLastTrigger int64, maxJSONStatsTaskCount int) (int64, int) {
collections := si.mt.GetCollections()
for _, collection := range collections {
needTriggerFieldIDs := make([]UniqueID, 0)
for _, field := range collection.Schema.GetFields() {
h := typeutil.CreateFieldSchemaHelper(field)
if h.EnableJSONKeyStatsIndex() && Params.CommonCfg.EnabledJSONKeyStats.GetAsBool() {
needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID())
}
}
segments := si.mt.SelectSegments(si.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool {
return needDoJsonKeyIndex(seg, needTriggerFieldIDs)
}))
if time.Now().Unix()-lastJSONStatsLastTrigger > int64(Params.DataCoordCfg.JSONStatsTriggerInterval.GetAsDuration(time.Minute).Seconds()) {
lastJSONStatsLastTrigger = time.Now().Unix()
maxJSONStatsTaskCount = 0
}
for _, segment := range segments {
if maxJSONStatsTaskCount >= Params.DataCoordCfg.JSONStatsTriggerCount.GetAsInt() {
break
}
if err := si.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_JsonKeyIndexJob, true); err != nil {
log.Warn("create stats task with json key index for segment failed, wait for retry:",
zap.Int64("segmentID", segment.GetID()), zap.Error(err))
continue
}
maxJSONStatsTaskCount++
}
}
return lastJSONStatsLastTrigger, maxJSONStatsTaskCount
}
func (si *statsInspector) triggerBM25StatsTask() {
collections := si.mt.GetCollections()
for _, collection := range collections {

View File

@ -405,6 +405,13 @@ func (st *statsTask) SetJobInfo(ctx context.Context, result *workerpb.StatsResul
zap.Int64("segmentID", st.GetSegmentID()), zap.Error(err))
return err
}
case indexpb.StatsSubJob_JsonKeyIndexJob:
err := st.meta.UpdateSegment(st.GetSegmentID(), SetJsonKeyIndexLogs(result.GetJsonKeyStatsLogs()))
if err != nil {
log.Ctx(ctx).Warn("save json key index stats result failed", zap.Int64("taskId", st.GetTaskID()),
zap.Int64("segmentID", st.GetSegmentID()), zap.Error(err))
return err
}
case indexpb.StatsSubJob_BM25Job:
// bm25 logs are generated during with segment flush.
}