From 7fca6e759feea3a2c51356671486c9dad158959e Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 23 Dec 2025 21:05:18 +0800 Subject: [PATCH] enhance: Execute text indexes for multiple fields concurrently (#46279) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit issue: #46274 ## Summary by CodeRabbit * **Performance Improvements** * Field-level text index creation and JSON-key statistics now run concurrently, reducing overall indexing time and speeding task completion. * **Observability Enhancements** * Per-task and per-field logging expanded with richer context and per-phase elapsed-time reporting for improved monitoring and diagnostics. * **Refactor** * Node slot handling simplified to compute slot counts on demand instead of storing them. ✏️ Tip: You can customize this high-level summary in your review settings. --------- Signed-off-by: Cai Zhang --- .../datanode/compactor/sort_compaction.go | 108 ++++++----- internal/datanode/data_node.go | 3 - internal/datanode/index/task_stats.go | 174 +++++++++++------- internal/datanode/index_services.go | 16 +- internal/datanode/services.go | 3 +- 5 files changed, 188 insertions(+), 116 deletions(-) diff --git a/internal/datanode/compactor/sort_compaction.go b/internal/datanode/compactor/sort_compaction.go index 260462bde2..69774a58a6 100644 --- a/internal/datanode/compactor/sort_compaction.go +++ b/internal/datanode/compactor/sort_compaction.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "runtime/debug" + "sync" "time" "github.com/apache/arrow/go/v17/arrow/array" @@ -27,6 +28,7 @@ import ( "github.com/samber/lo" "go.opentelemetry.io/otel" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" @@ -422,58 +424,76 @@ func (t *sortCompactionTask) createTextIndex(ctx context.Context, return nil, err } - textIndexLogs := make(map[int64]*datapb.TextIndexStats) + // Concurrent create text index for all match-enabled fields + var ( + mu sync.Mutex + textIndexLogs = make(map[int64]*datapb.TextIndexStats) + ) + + eg, egCtx := errgroup.WithContext(ctx) + for _, field := range t.plan.GetSchema().GetFields() { + field := field h := typeutil.CreateFieldSchemaHelper(field) if !h.EnableMatch() { continue } log.Info("field enable match, ready to create text index", zap.Int64("field id", field.GetFieldID())) - // create text index and upload the text index files. - files, err := getInsertFiles(field.GetFieldID()) - if err != nil { - return nil, err - } - buildIndexParams := &indexcgopb.BuildIndexInfo{ - BuildID: t.GetPlanID(), - CollectionID: collectionID, - PartitionID: partitionID, - SegmentID: segmentID, - IndexVersion: 0, // always zero - InsertFiles: files, - FieldSchema: field, - StorageConfig: newStorageConfig, - CurrentScalarIndexVersion: t.plan.GetCurrentScalarIndexVersion(), - StorageVersion: t.storageVersion, - Manifest: t.manifest, - } + eg.Go(func() error { + files, err := getInsertFiles(field.GetFieldID()) + if err != nil { + return err + } - if t.storageVersion == storage.StorageV2 { - buildIndexParams.SegmentInsertFiles = util.GetSegmentInsertFiles( - insertBinlogs, - t.compactionParams.StorageConfig, - collectionID, - partitionID, - segmentID) - } - uploaded, err := indexcgowrapper.CreateTextIndex(ctx, buildIndexParams) - if err != nil { - return nil, err - } - textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{ - FieldID: field.GetFieldID(), - Version: 0, - BuildID: taskID, - Files: lo.Keys(uploaded), - } - elapse := t.tr.RecordSpan() - log.Info("field enable match, create text index done", - zap.Int64("segmentID", segmentID), - zap.Int64("field id", field.GetFieldID()), - zap.Strings("files", lo.Keys(uploaded)), - zap.Duration("elapse", elapse), - ) + buildIndexParams := &indexcgopb.BuildIndexInfo{ + BuildID: t.GetPlanID(), + CollectionID: collectionID, + PartitionID: partitionID, + SegmentID: segmentID, + IndexVersion: 0, // always zero + InsertFiles: files, + FieldSchema: field, + StorageConfig: newStorageConfig, + CurrentScalarIndexVersion: t.plan.GetCurrentScalarIndexVersion(), + StorageVersion: t.storageVersion, + Manifest: t.manifest, + } + + if t.storageVersion == storage.StorageV2 { + buildIndexParams.SegmentInsertFiles = util.GetSegmentInsertFiles( + insertBinlogs, + t.compactionParams.StorageConfig, + collectionID, + partitionID, + segmentID) + } + uploaded, err := indexcgowrapper.CreateTextIndex(egCtx, buildIndexParams) + if err != nil { + return err + } + + mu.Lock() + textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{ + FieldID: field.GetFieldID(), + Version: 0, + BuildID: taskID, + Files: lo.Keys(uploaded), + } + mu.Unlock() + + log.Info("field enable match, create text index done", + zap.Int64("segmentID", segmentID), + zap.Int64("field id", field.GetFieldID()), + zap.Strings("files", lo.Keys(uploaded)), + ) + return nil + }) } + + if err := eg.Wait(); err != nil { + return nil, err + } + return textIndexLogs, nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index cdcea68569..09ee0b6a6d 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -104,8 +104,6 @@ type DataNode struct { reportImportRetryTimes uint // unitest set this value to 1 to save time, default is 10 pool *conc.Pool[any] - totalSlot int64 - metricsRequest *metricsinfo.MetricsRequest } @@ -121,7 +119,6 @@ func NewDataNode(ctx context.Context) *DataNode { compactionExecutor: compactor.NewExecutor(), reportImportRetryTimes: 10, metricsRequest: metricsinfo.NewMetricsRequest(), - totalSlot: index.CalculateNodeSlots(), } sc := index.NewTaskScheduler(ctx2) node.storageFactory = NewChunkMgrFactory() diff --git a/internal/datanode/index/task_stats.go b/internal/datanode/index/task_stats.go index 7be0c9e2c3..94dbca895e 100644 --- a/internal/datanode/index/task_stats.go +++ b/internal/datanode/index/task_stats.go @@ -21,12 +21,14 @@ import ( "fmt" "runtime/debug" "strconv" + "sync" "time" "github.com/apache/arrow/go/v17/arrow/array" "github.com/samber/lo" "go.opentelemetry.io/otel" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -286,6 +288,7 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) { int64(numValidRows), insertLogs, statsLogs, bm25StatsLogs) debug.FreeOSMemory() + elapse := st.tr.RecordSpan() log.Info("sort segment end", zap.String("clusterID", st.req.GetClusterID()), zap.Int64("taskID", st.req.GetTaskID()), @@ -295,7 +298,9 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) { zap.String("subTaskType", st.req.GetSubJobType().String()), zap.Int64("target segmentID", st.req.GetTargetSegmentID()), zap.Int64("old rows", numRows), - zap.Int("valid rows", numValidRows)) + zap.Int("valid rows", numValidRows), + zap.Duration("elapse", elapse), + ) return insertLogs, nil } @@ -468,40 +473,57 @@ func (st *statsTask) createTextIndex(ctx context.Context, return err } - textIndexLogs := make(map[int64]*datapb.TextIndexStats) + // Concurrent create text index for all match-enabled fields + var ( + mu sync.Mutex + textIndexLogs = make(map[int64]*datapb.TextIndexStats) + ) + + eg, egCtx := errgroup.WithContext(ctx) + for _, field := range st.req.GetSchema().GetFields() { + field := field h := typeutil.CreateFieldSchemaHelper(field) if !h.EnableMatch() { continue } log.Info("field enable match, ready to create text index", zap.Int64("field id", field.GetFieldID())) - // create text index and upload the text index files. - files, err := getInsertFiles(field.GetFieldID(), field.GetNullable()) - if err != nil { - return err - } - req := proto.Clone(st.req).(*workerpb.CreateStatsRequest) - req.InsertLogs = insertBinlogs - buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, nil) + eg.Go(func() error { + files, err := getInsertFiles(field.GetFieldID(), field.GetNullable()) + if err != nil { + return err + } - uploaded, err := indexcgowrapper.CreateTextIndex(ctx, buildIndexParams) - if err != nil { - return err - } - textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{ - FieldID: field.GetFieldID(), - Version: version, - BuildID: taskID, - Files: lo.Keys(uploaded), - } - elapse := st.tr.RecordSpan() - log.Info("field enable match, create text index done", - zap.Int64("targetSegmentID", st.req.GetTargetSegmentID()), - zap.Int64("field id", field.GetFieldID()), - zap.Strings("files", lo.Keys(uploaded)), - zap.Duration("elapse", elapse), - ) + req := proto.Clone(st.req).(*workerpb.CreateStatsRequest) + req.InsertLogs = insertBinlogs + buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, nil) + + uploaded, err := indexcgowrapper.CreateTextIndex(egCtx, buildIndexParams) + if err != nil { + return err + } + + mu.Lock() + textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{ + FieldID: field.GetFieldID(), + Version: version, + BuildID: taskID, + Files: lo.Keys(uploaded), + } + mu.Unlock() + + log.Info("field enable match, create text index done", + zap.Int64("targetSegmentID", st.req.GetTargetSegmentID()), + zap.Int64("field id", field.GetFieldID()), + zap.Strings("files", lo.Keys(uploaded)), + ) + return nil + }) + } + + if err := eg.Wait(); err != nil { + return err } st.manager.StoreStatsTextIndexResult(st.req.GetClusterID(), @@ -511,6 +533,11 @@ func (st *statsTask) createTextIndex(ctx context.Context, st.req.GetTargetSegmentID(), st.req.GetInsertChannel(), textIndexLogs) + totalElapse := st.tr.RecordSpan() + log.Info("create text index done", + zap.Int64("target segmentID", st.req.GetTargetSegmentID()), + zap.Duration("total elapse", totalElapse), + ) return nil } @@ -571,53 +598,72 @@ func (st *statsTask) createJSONKeyStats(ctx context.Context, return err } - jsonKeyIndexStats := make(map[int64]*datapb.JsonKeyStats) + // Concurrent create JSON key index for all enabled fields + var ( + mu sync.Mutex + jsonKeyIndexStats = make(map[int64]*datapb.JsonKeyStats) + ) + + eg, egCtx := errgroup.WithContext(ctx) + for _, field := range st.req.GetSchema().GetFields() { + field := field h := typeutil.CreateFieldSchemaHelper(field) if !h.EnableJSONKeyStatsIndex() { continue } log.Info("field enable json key index, ready to create json key index", zap.Int64("field id", field.GetFieldID())) - files, err := getInsertFiles(field.GetFieldID()) - if err != nil { - return err - } - req := proto.Clone(st.req).(*workerpb.CreateStatsRequest) - req.InsertLogs = insertBinlogs - options := &BuildIndexOptions{ - JsonStatsMaxShreddingColumns: jsonStatsMaxShreddingColumns, - JsonStatsShreddingRatio: jsonStatsShreddingRatioThreshold, - JsonStatsWriteBatchSize: jsonStatsWriteBatchSize, - } - buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, options) + eg.Go(func() error { + files, err := getInsertFiles(field.GetFieldID()) + if err != nil { + return err + } - statsResult, err := indexcgowrapper.CreateJSONKeyStats(ctx, buildIndexParams) - if err != nil { - return err - } + req := proto.Clone(st.req).(*workerpb.CreateStatsRequest) + req.InsertLogs = insertBinlogs + options := &BuildIndexOptions{ + JsonStatsMaxShreddingColumns: jsonStatsMaxShreddingColumns, + JsonStatsShreddingRatio: jsonStatsShreddingRatioThreshold, + JsonStatsWriteBatchSize: jsonStatsWriteBatchSize, + } + buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, options) - // calculate log size (disk size) from file sizes - var logSize int64 - for _, fileSize := range statsResult.Files { - logSize += fileSize - } + statsResult, err := indexcgowrapper.CreateJSONKeyStats(egCtx, buildIndexParams) + if err != nil { + return err + } - jsonKeyIndexStats[field.GetFieldID()] = &datapb.JsonKeyStats{ - FieldID: field.GetFieldID(), - Version: version, - BuildID: taskID, - Files: lo.Keys(statsResult.Files), - JsonKeyStatsDataFormat: jsonKeyStatsDataFormat, - MemorySize: statsResult.MemSize, - LogSize: logSize, - } - log.Info("field enable json key index, create json key index done", - zap.Int64("field id", field.GetFieldID()), - zap.Strings("files", lo.Keys(statsResult.Files)), - zap.Int64("memorySize", statsResult.MemSize), - zap.Int64("logSize", logSize), - ) + // calculate log size (disk size) from file sizes + var logSize int64 + for _, fileSize := range statsResult.Files { + logSize += fileSize + } + + mu.Lock() + jsonKeyIndexStats[field.GetFieldID()] = &datapb.JsonKeyStats{ + FieldID: field.GetFieldID(), + Version: version, + BuildID: taskID, + Files: lo.Keys(statsResult.Files), + JsonKeyStatsDataFormat: jsonKeyStatsDataFormat, + MemorySize: statsResult.MemSize, + LogSize: logSize, + } + mu.Unlock() + + log.Info("field enable json key index, create json key index done", + zap.Int64("field id", field.GetFieldID()), + zap.Strings("files", lo.Keys(statsResult.Files)), + zap.Int64("memorySize", statsResult.MemSize), + zap.Int64("logSize", logSize), + ) + return nil + }) + } + + if err := eg.Wait(); err != nil { + return err } totalElapse := st.tr.RecordSpan() diff --git a/internal/datanode/index_services.go b/internal/datanode/index_services.go index 1a36e7ead9..f294ccfcb4 100644 --- a/internal/datanode/index_services.go +++ b/internal/datanode/index_services.go @@ -201,7 +201,7 @@ func (node *DataNode) GetJobStats(ctx context.Context, req *workerpb.GetJobStats defer node.lifetime.Done() var ( - totalSlots = node.totalSlot + totalSlots = index.CalculateNodeSlots() indexStatsUsed = node.taskScheduler.TaskQueue.GetUsingSlot() compactionUsed = node.compactionExecutor.Slots() importUsed = node.importScheduler.Slots() @@ -222,7 +222,7 @@ func (node *DataNode) GetJobStats(ctx context.Context, req *workerpb.GetJobStats return &workerpb.GetJobStatsResponse{ Status: merr.Success(), - TotalSlots: node.totalSlot, + TotalSlots: totalSlots, AvailableSlots: availableSlots, }, nil } @@ -262,6 +262,8 @@ func (node *DataNode) CreateJobV2(ctx context.Context, req *workerpb.CreateJobV2 func (node *DataNode) createIndexTask(ctx context.Context, req *workerpb.CreateJobRequest) (*commonpb.Status, error) { log.Ctx(ctx).Info("DataNode building index ...", + zap.String("clusterID", req.GetClusterID()), + zap.Int64("taskID", req.GetBuildID()), zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("partitionID", req.GetPartitionID()), zap.Int64("segmentID", req.GetSegmentID()), @@ -329,7 +331,10 @@ func (node *DataNode) createIndexTask(ctx context.Context, req *workerpb.CreateJ } func (node *DataNode) createAnalyzeTask(ctx context.Context, req *workerpb.AnalyzeRequest) (*commonpb.Status, error) { - log.Ctx(ctx).Info("receive analyze job", zap.Int64("collectionID", req.GetCollectionID()), + log.Ctx(ctx).Info("receive analyze job", + zap.String("clusterID", req.GetClusterID()), + zap.Int64("taskID", req.GetTaskID()), + zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("partitionID", req.GetPartitionID()), zap.Int64("fieldID", req.GetFieldID()), zap.String("fieldName", req.GetFieldName()), @@ -368,7 +373,10 @@ func (node *DataNode) createAnalyzeTask(ctx context.Context, req *workerpb.Analy } func (node *DataNode) createStatsTask(ctx context.Context, req *workerpb.CreateStatsRequest) (*commonpb.Status, error) { - log.Ctx(ctx).Info("receive stats job", zap.Int64("collectionID", req.GetCollectionID()), + log.Ctx(ctx).Info("receive stats job", + zap.String("clusterID", req.GetClusterID()), + zap.Int64("taskID", req.GetTaskID()), + zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("partitionID", req.GetPartitionID()), zap.Int64("segmentID", req.GetSegmentID()), zap.Int64("numRows", req.GetNumRows()), diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 2bdacc32c0..ff4b0e030e 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/compaction" "github.com/milvus-io/milvus/internal/datanode/compactor" "github.com/milvus-io/milvus/internal/datanode/importv2" + "github.com/milvus-io/milvus/internal/datanode/index" "github.com/milvus-io/milvus/internal/flushcommon/io" "github.com/milvus-io/milvus/internal/util/fileresource" "github.com/milvus-io/milvus/internal/util/hookutil" @@ -510,7 +511,7 @@ func (node *DataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotReques } var ( - totalSlots = node.totalSlot + totalSlots = index.CalculateNodeSlots() indexStatsUsed = node.taskScheduler.TaskQueue.GetUsingSlot() compactionUsed = node.compactionExecutor.Slots() importUsed = node.importScheduler.Slots()