diff --git a/internal/datacoord/stats_inspector.go b/internal/datacoord/stats_inspector.go index 58558e8587..05a52c2567 100644 --- a/internal/datacoord/stats_inspector.go +++ b/internal/datacoord/stats_inspector.go @@ -125,6 +125,9 @@ func (si *statsInspector) triggerStatsTaskLoop() { ticker := time.NewTicker(Params.DataCoordCfg.TaskCheckInterval.GetAsDuration(time.Second)) defer ticker.Stop() + + lastJSONStatsLastTrigger := time.Now().Unix() + maxJSONStatsTaskCount := 0 for { select { case <-si.ctx.Done(): @@ -134,6 +137,7 @@ func (si *statsInspector) triggerStatsTaskLoop() { si.triggerSortStatsTask() si.triggerTextStatsTask() si.triggerBM25StatsTask() + lastJSONStatsLastTrigger, maxJSONStatsTaskCount = si.triggerJsonKeyIndexStatsTask(lastJSONStatsLastTrigger, maxJSONStatsTaskCount) case segID := <-getStatsTaskChSingleton(): log.Info("receive new segment to trigger stats task", zap.Int64("segmentID", segID)) @@ -204,6 +208,20 @@ func needDoTextIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { return false } +func needDoJsonKeyIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { + if !(isFlush(segment) && segment.GetLevel() != datapb.SegmentLevel_L0 && + segment.GetIsSorted()) { + return false + } + + for _, fieldID := range fieldIDs { + if segment.GetJsonKeyStats()[fieldID] == nil { + return true + } + } + return false +} + func needDoBM25(segment *SegmentInfo, fieldIDs []UniqueID) bool { // TODO: docking bm25 stats task return false @@ -235,6 +253,38 @@ func (si *statsInspector) triggerTextStatsTask() { } } +func (si *statsInspector) triggerJsonKeyIndexStatsTask(lastJSONStatsLastTrigger int64, maxJSONStatsTaskCount int) (int64, int) { + collections := si.mt.GetCollections() + for _, collection := range collections { + needTriggerFieldIDs := make([]UniqueID, 0) + for _, field := range collection.Schema.GetFields() { + h := typeutil.CreateFieldSchemaHelper(field) + if h.EnableJSONKeyStatsIndex() && Params.CommonCfg.EnabledJSONKeyStats.GetAsBool() { + needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID()) + } + } + segments := si.mt.SelectSegments(si.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool { + return needDoJsonKeyIndex(seg, needTriggerFieldIDs) + })) + if time.Now().Unix()-lastJSONStatsLastTrigger > int64(Params.DataCoordCfg.JSONStatsTriggerInterval.GetAsDuration(time.Minute).Seconds()) { + lastJSONStatsLastTrigger = time.Now().Unix() + maxJSONStatsTaskCount = 0 + } + for _, segment := range segments { + if maxJSONStatsTaskCount >= Params.DataCoordCfg.JSONStatsTriggerCount.GetAsInt() { + break + } + if err := si.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_JsonKeyIndexJob, true); err != nil { + log.Warn("create stats task with json key index for segment failed, wait for retry:", + zap.Int64("segmentID", segment.GetID()), zap.Error(err)) + continue + } + maxJSONStatsTaskCount++ + } + } + return lastJSONStatsLastTrigger, maxJSONStatsTaskCount +} + func (si *statsInspector) triggerBM25StatsTask() { collections := si.mt.GetCollections() for _, collection := range collections { diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 3c75df7663..8843db0736 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -405,6 +405,13 @@ func (st *statsTask) SetJobInfo(ctx context.Context, result *workerpb.StatsResul zap.Int64("segmentID", st.GetSegmentID()), zap.Error(err)) return err } + case indexpb.StatsSubJob_JsonKeyIndexJob: + err := st.meta.UpdateSegment(st.GetSegmentID(), SetJsonKeyIndexLogs(result.GetJsonKeyStatsLogs())) + if err != nil { + log.Ctx(ctx).Warn("save json key index stats result failed", zap.Int64("taskId", st.GetTaskID()), + zap.Int64("segmentID", st.GetSegmentID()), zap.Error(err)) + return err + } case indexpb.StatsSubJob_BM25Job: // bm25 logs are generated during with segment flush. }