enhance: Execute text indexes for multiple fields concurrently (#46279)

issue: #46274 

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Performance Improvements**
* Field-level text index creation and JSON-key statistics now run
concurrently, reducing overall indexing time and speeding task
completion.

* **Observability Enhancements**
* Per-task and per-field logging expanded with richer context and
per-phase elapsed-time reporting for improved monitoring and
diagnostics.

* **Refactor**
* Node slot handling simplified to compute slot counts on demand instead
of storing them.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-12-23 21:05:18 +08:00 committed by GitHub
parent 0943713481
commit 7fca6e759f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 188 additions and 116 deletions

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"runtime/debug"
"sync"
"time"
"github.com/apache/arrow/go/v17/arrow/array"
@ -27,6 +28,7 @@ import (
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
@ -422,58 +424,76 @@ func (t *sortCompactionTask) createTextIndex(ctx context.Context,
return nil, err
}
textIndexLogs := make(map[int64]*datapb.TextIndexStats)
// Concurrent create text index for all match-enabled fields
var (
mu sync.Mutex
textIndexLogs = make(map[int64]*datapb.TextIndexStats)
)
eg, egCtx := errgroup.WithContext(ctx)
for _, field := range t.plan.GetSchema().GetFields() {
field := field
h := typeutil.CreateFieldSchemaHelper(field)
if !h.EnableMatch() {
continue
}
log.Info("field enable match, ready to create text index", zap.Int64("field id", field.GetFieldID()))
// create text index and upload the text index files.
files, err := getInsertFiles(field.GetFieldID())
if err != nil {
return nil, err
}
buildIndexParams := &indexcgopb.BuildIndexInfo{
BuildID: t.GetPlanID(),
CollectionID: collectionID,
PartitionID: partitionID,
SegmentID: segmentID,
IndexVersion: 0, // always zero
InsertFiles: files,
FieldSchema: field,
StorageConfig: newStorageConfig,
CurrentScalarIndexVersion: t.plan.GetCurrentScalarIndexVersion(),
StorageVersion: t.storageVersion,
Manifest: t.manifest,
}
eg.Go(func() error {
files, err := getInsertFiles(field.GetFieldID())
if err != nil {
return err
}
if t.storageVersion == storage.StorageV2 {
buildIndexParams.SegmentInsertFiles = util.GetSegmentInsertFiles(
insertBinlogs,
t.compactionParams.StorageConfig,
collectionID,
partitionID,
segmentID)
}
uploaded, err := indexcgowrapper.CreateTextIndex(ctx, buildIndexParams)
if err != nil {
return nil, err
}
textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{
FieldID: field.GetFieldID(),
Version: 0,
BuildID: taskID,
Files: lo.Keys(uploaded),
}
elapse := t.tr.RecordSpan()
log.Info("field enable match, create text index done",
zap.Int64("segmentID", segmentID),
zap.Int64("field id", field.GetFieldID()),
zap.Strings("files", lo.Keys(uploaded)),
zap.Duration("elapse", elapse),
)
buildIndexParams := &indexcgopb.BuildIndexInfo{
BuildID: t.GetPlanID(),
CollectionID: collectionID,
PartitionID: partitionID,
SegmentID: segmentID,
IndexVersion: 0, // always zero
InsertFiles: files,
FieldSchema: field,
StorageConfig: newStorageConfig,
CurrentScalarIndexVersion: t.plan.GetCurrentScalarIndexVersion(),
StorageVersion: t.storageVersion,
Manifest: t.manifest,
}
if t.storageVersion == storage.StorageV2 {
buildIndexParams.SegmentInsertFiles = util.GetSegmentInsertFiles(
insertBinlogs,
t.compactionParams.StorageConfig,
collectionID,
partitionID,
segmentID)
}
uploaded, err := indexcgowrapper.CreateTextIndex(egCtx, buildIndexParams)
if err != nil {
return err
}
mu.Lock()
textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{
FieldID: field.GetFieldID(),
Version: 0,
BuildID: taskID,
Files: lo.Keys(uploaded),
}
mu.Unlock()
log.Info("field enable match, create text index done",
zap.Int64("segmentID", segmentID),
zap.Int64("field id", field.GetFieldID()),
zap.Strings("files", lo.Keys(uploaded)),
)
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
return textIndexLogs, nil
}

View File

@ -104,8 +104,6 @@ type DataNode struct {
reportImportRetryTimes uint // unitest set this value to 1 to save time, default is 10
pool *conc.Pool[any]
totalSlot int64
metricsRequest *metricsinfo.MetricsRequest
}
@ -121,7 +119,6 @@ func NewDataNode(ctx context.Context) *DataNode {
compactionExecutor: compactor.NewExecutor(),
reportImportRetryTimes: 10,
metricsRequest: metricsinfo.NewMetricsRequest(),
totalSlot: index.CalculateNodeSlots(),
}
sc := index.NewTaskScheduler(ctx2)
node.storageFactory = NewChunkMgrFactory()

View File

@ -21,12 +21,14 @@ import (
"fmt"
"runtime/debug"
"strconv"
"sync"
"time"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -286,6 +288,7 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) {
int64(numValidRows), insertLogs, statsLogs, bm25StatsLogs)
debug.FreeOSMemory()
elapse := st.tr.RecordSpan()
log.Info("sort segment end",
zap.String("clusterID", st.req.GetClusterID()),
zap.Int64("taskID", st.req.GetTaskID()),
@ -295,7 +298,9 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) {
zap.String("subTaskType", st.req.GetSubJobType().String()),
zap.Int64("target segmentID", st.req.GetTargetSegmentID()),
zap.Int64("old rows", numRows),
zap.Int("valid rows", numValidRows))
zap.Int("valid rows", numValidRows),
zap.Duration("elapse", elapse),
)
return insertLogs, nil
}
@ -468,40 +473,57 @@ func (st *statsTask) createTextIndex(ctx context.Context,
return err
}
textIndexLogs := make(map[int64]*datapb.TextIndexStats)
// Concurrent create text index for all match-enabled fields
var (
mu sync.Mutex
textIndexLogs = make(map[int64]*datapb.TextIndexStats)
)
eg, egCtx := errgroup.WithContext(ctx)
for _, field := range st.req.GetSchema().GetFields() {
field := field
h := typeutil.CreateFieldSchemaHelper(field)
if !h.EnableMatch() {
continue
}
log.Info("field enable match, ready to create text index", zap.Int64("field id", field.GetFieldID()))
// create text index and upload the text index files.
files, err := getInsertFiles(field.GetFieldID(), field.GetNullable())
if err != nil {
return err
}
req := proto.Clone(st.req).(*workerpb.CreateStatsRequest)
req.InsertLogs = insertBinlogs
buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, nil)
eg.Go(func() error {
files, err := getInsertFiles(field.GetFieldID(), field.GetNullable())
if err != nil {
return err
}
uploaded, err := indexcgowrapper.CreateTextIndex(ctx, buildIndexParams)
if err != nil {
return err
}
textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{
FieldID: field.GetFieldID(),
Version: version,
BuildID: taskID,
Files: lo.Keys(uploaded),
}
elapse := st.tr.RecordSpan()
log.Info("field enable match, create text index done",
zap.Int64("targetSegmentID", st.req.GetTargetSegmentID()),
zap.Int64("field id", field.GetFieldID()),
zap.Strings("files", lo.Keys(uploaded)),
zap.Duration("elapse", elapse),
)
req := proto.Clone(st.req).(*workerpb.CreateStatsRequest)
req.InsertLogs = insertBinlogs
buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, nil)
uploaded, err := indexcgowrapper.CreateTextIndex(egCtx, buildIndexParams)
if err != nil {
return err
}
mu.Lock()
textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{
FieldID: field.GetFieldID(),
Version: version,
BuildID: taskID,
Files: lo.Keys(uploaded),
}
mu.Unlock()
log.Info("field enable match, create text index done",
zap.Int64("targetSegmentID", st.req.GetTargetSegmentID()),
zap.Int64("field id", field.GetFieldID()),
zap.Strings("files", lo.Keys(uploaded)),
)
return nil
})
}
if err := eg.Wait(); err != nil {
return err
}
st.manager.StoreStatsTextIndexResult(st.req.GetClusterID(),
@ -511,6 +533,11 @@ func (st *statsTask) createTextIndex(ctx context.Context,
st.req.GetTargetSegmentID(),
st.req.GetInsertChannel(),
textIndexLogs)
totalElapse := st.tr.RecordSpan()
log.Info("create text index done",
zap.Int64("target segmentID", st.req.GetTargetSegmentID()),
zap.Duration("total elapse", totalElapse),
)
return nil
}
@ -571,53 +598,72 @@ func (st *statsTask) createJSONKeyStats(ctx context.Context,
return err
}
jsonKeyIndexStats := make(map[int64]*datapb.JsonKeyStats)
// Concurrent create JSON key index for all enabled fields
var (
mu sync.Mutex
jsonKeyIndexStats = make(map[int64]*datapb.JsonKeyStats)
)
eg, egCtx := errgroup.WithContext(ctx)
for _, field := range st.req.GetSchema().GetFields() {
field := field
h := typeutil.CreateFieldSchemaHelper(field)
if !h.EnableJSONKeyStatsIndex() {
continue
}
log.Info("field enable json key index, ready to create json key index", zap.Int64("field id", field.GetFieldID()))
files, err := getInsertFiles(field.GetFieldID())
if err != nil {
return err
}
req := proto.Clone(st.req).(*workerpb.CreateStatsRequest)
req.InsertLogs = insertBinlogs
options := &BuildIndexOptions{
JsonStatsMaxShreddingColumns: jsonStatsMaxShreddingColumns,
JsonStatsShreddingRatio: jsonStatsShreddingRatioThreshold,
JsonStatsWriteBatchSize: jsonStatsWriteBatchSize,
}
buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, options)
eg.Go(func() error {
files, err := getInsertFiles(field.GetFieldID())
if err != nil {
return err
}
statsResult, err := indexcgowrapper.CreateJSONKeyStats(ctx, buildIndexParams)
if err != nil {
return err
}
req := proto.Clone(st.req).(*workerpb.CreateStatsRequest)
req.InsertLogs = insertBinlogs
options := &BuildIndexOptions{
JsonStatsMaxShreddingColumns: jsonStatsMaxShreddingColumns,
JsonStatsShreddingRatio: jsonStatsShreddingRatioThreshold,
JsonStatsWriteBatchSize: jsonStatsWriteBatchSize,
}
buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, options)
// calculate log size (disk size) from file sizes
var logSize int64
for _, fileSize := range statsResult.Files {
logSize += fileSize
}
statsResult, err := indexcgowrapper.CreateJSONKeyStats(egCtx, buildIndexParams)
if err != nil {
return err
}
jsonKeyIndexStats[field.GetFieldID()] = &datapb.JsonKeyStats{
FieldID: field.GetFieldID(),
Version: version,
BuildID: taskID,
Files: lo.Keys(statsResult.Files),
JsonKeyStatsDataFormat: jsonKeyStatsDataFormat,
MemorySize: statsResult.MemSize,
LogSize: logSize,
}
log.Info("field enable json key index, create json key index done",
zap.Int64("field id", field.GetFieldID()),
zap.Strings("files", lo.Keys(statsResult.Files)),
zap.Int64("memorySize", statsResult.MemSize),
zap.Int64("logSize", logSize),
)
// calculate log size (disk size) from file sizes
var logSize int64
for _, fileSize := range statsResult.Files {
logSize += fileSize
}
mu.Lock()
jsonKeyIndexStats[field.GetFieldID()] = &datapb.JsonKeyStats{
FieldID: field.GetFieldID(),
Version: version,
BuildID: taskID,
Files: lo.Keys(statsResult.Files),
JsonKeyStatsDataFormat: jsonKeyStatsDataFormat,
MemorySize: statsResult.MemSize,
LogSize: logSize,
}
mu.Unlock()
log.Info("field enable json key index, create json key index done",
zap.Int64("field id", field.GetFieldID()),
zap.Strings("files", lo.Keys(statsResult.Files)),
zap.Int64("memorySize", statsResult.MemSize),
zap.Int64("logSize", logSize),
)
return nil
})
}
if err := eg.Wait(); err != nil {
return err
}
totalElapse := st.tr.RecordSpan()

View File

@ -201,7 +201,7 @@ func (node *DataNode) GetJobStats(ctx context.Context, req *workerpb.GetJobStats
defer node.lifetime.Done()
var (
totalSlots = node.totalSlot
totalSlots = index.CalculateNodeSlots()
indexStatsUsed = node.taskScheduler.TaskQueue.GetUsingSlot()
compactionUsed = node.compactionExecutor.Slots()
importUsed = node.importScheduler.Slots()
@ -222,7 +222,7 @@ func (node *DataNode) GetJobStats(ctx context.Context, req *workerpb.GetJobStats
return &workerpb.GetJobStatsResponse{
Status: merr.Success(),
TotalSlots: node.totalSlot,
TotalSlots: totalSlots,
AvailableSlots: availableSlots,
}, nil
}
@ -262,6 +262,8 @@ func (node *DataNode) CreateJobV2(ctx context.Context, req *workerpb.CreateJobV2
func (node *DataNode) createIndexTask(ctx context.Context, req *workerpb.CreateJobRequest) (*commonpb.Status, error) {
log.Ctx(ctx).Info("DataNode building index ...",
zap.String("clusterID", req.GetClusterID()),
zap.Int64("taskID", req.GetBuildID()),
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("partitionID", req.GetPartitionID()),
zap.Int64("segmentID", req.GetSegmentID()),
@ -329,7 +331,10 @@ func (node *DataNode) createIndexTask(ctx context.Context, req *workerpb.CreateJ
}
func (node *DataNode) createAnalyzeTask(ctx context.Context, req *workerpb.AnalyzeRequest) (*commonpb.Status, error) {
log.Ctx(ctx).Info("receive analyze job", zap.Int64("collectionID", req.GetCollectionID()),
log.Ctx(ctx).Info("receive analyze job",
zap.String("clusterID", req.GetClusterID()),
zap.Int64("taskID", req.GetTaskID()),
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("partitionID", req.GetPartitionID()),
zap.Int64("fieldID", req.GetFieldID()),
zap.String("fieldName", req.GetFieldName()),
@ -368,7 +373,10 @@ func (node *DataNode) createAnalyzeTask(ctx context.Context, req *workerpb.Analy
}
func (node *DataNode) createStatsTask(ctx context.Context, req *workerpb.CreateStatsRequest) (*commonpb.Status, error) {
log.Ctx(ctx).Info("receive stats job", zap.Int64("collectionID", req.GetCollectionID()),
log.Ctx(ctx).Info("receive stats job",
zap.String("clusterID", req.GetClusterID()),
zap.Int64("taskID", req.GetTaskID()),
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("partitionID", req.GetPartitionID()),
zap.Int64("segmentID", req.GetSegmentID()),
zap.Int64("numRows", req.GetNumRows()),

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/datanode/compactor"
"github.com/milvus-io/milvus/internal/datanode/importv2"
"github.com/milvus-io/milvus/internal/datanode/index"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/util/fileresource"
"github.com/milvus-io/milvus/internal/util/hookutil"
@ -510,7 +511,7 @@ func (node *DataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotReques
}
var (
totalSlots = node.totalSlot
totalSlots = index.CalculateNodeSlots()
indexStatsUsed = node.taskScheduler.TaskQueue.GetUsingSlot()
compactionUsed = node.compactionExecutor.Slots()
importUsed = node.importScheduler.Slots()