milvus/internal/datacoord/index_inspector.go
cai.zhang 141547d8a8
enhance: Add log with segment size for tasks (#46118)
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
2025-12-05 16:45:11 +08:00

253 lines
8.6 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/task"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
type indexInspector struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
notifyIndexChan chan int64
meta *meta
scheduler task.GlobalScheduler
allocator allocator.Allocator
handler Handler
storageCli storage.ChunkManager
indexEngineVersionManager IndexEngineVersionManager
}
func newIndexInspector(
ctx context.Context,
notifyIndexChan chan int64,
meta *meta,
scheduler task.GlobalScheduler,
allocator allocator.Allocator,
handler Handler,
storageCli storage.ChunkManager,
indexEngineVersionManager IndexEngineVersionManager,
) *indexInspector {
ctx, cancel := context.WithCancel(ctx)
return &indexInspector{
ctx: ctx,
cancel: cancel,
meta: meta,
notifyIndexChan: notifyIndexChan,
scheduler: scheduler,
allocator: allocator,
handler: handler,
storageCli: storageCli,
indexEngineVersionManager: indexEngineVersionManager,
}
}
func (i *indexInspector) Start() {
i.reloadFromMeta()
i.wg.Add(1)
go i.createIndexForSegmentLoop(i.ctx)
}
func (i *indexInspector) Stop() {
i.cancel()
i.wg.Wait()
}
func (i *indexInspector) createIndexForSegmentLoop(ctx context.Context) {
log := log.Ctx(ctx)
log.Info("start create index for segment loop...",
zap.Int64("TaskCheckInterval", Params.DataCoordCfg.TaskCheckInterval.GetAsInt64()))
defer i.wg.Done()
ticker := time.NewTicker(Params.DataCoordCfg.TaskCheckInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Warn("DataCoord context done, exit...")
return
case <-ticker.C:
segments := i.getUnIndexTaskSegments(ctx)
for _, segment := range segments {
if err := i.createIndexesForSegment(ctx, segment); err != nil {
log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
continue
}
}
case collectionID := <-i.notifyIndexChan:
log.Info("receive create index notify", zap.Int64("collectionID", collectionID))
segments := i.meta.SelectSegments(ctx, WithCollection(collectionID), SegmentFilterFunc(func(info *SegmentInfo) bool {
return isFlush(info) && (!enableSortCompaction() || info.GetIsSorted())
}))
for _, segment := range segments {
if err := i.createIndexesForSegment(ctx, segment); err != nil {
log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
continue
}
}
case segID := <-getBuildIndexChSingleton():
log.Info("receive new flushed segment", zap.Int64("segmentID", segID))
segment := i.meta.GetSegment(ctx, segID)
if segment == nil {
log.Warn("segment is not exist, no need to build index", zap.Int64("segmentID", segID))
continue
}
if err := i.createIndexesForSegment(ctx, segment); err != nil {
log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
continue
}
}
}
}
func (i *indexInspector) getUnIndexTaskSegments(ctx context.Context) []*SegmentInfo {
flushedSegments := i.meta.SelectSegments(ctx, SegmentFilterFunc(func(seg *SegmentInfo) bool {
return isFlush(seg)
}))
unindexedSegments := make([]*SegmentInfo, 0)
for _, segment := range flushedSegments {
if i.meta.indexMeta.IsUnIndexedSegment(segment.CollectionID, segment.GetID()) {
unindexedSegments = append(unindexedSegments, segment)
}
}
return unindexedSegments
}
func (i *indexInspector) createIndexesForSegment(ctx context.Context, segment *SegmentInfo) error {
if enableSortCompaction() && !segment.GetIsSorted() {
log.Ctx(ctx).Debug("segment is not sorted by pk, skip create indexes", zap.Int64("segmentID", segment.GetID()))
return nil
}
if segment.GetLevel() == datapb.SegmentLevel_L0 {
log.Ctx(ctx).Debug("segment is level zero, skip create indexes", zap.Int64("segmentID", segment.GetID()))
return nil
}
indexes := i.meta.indexMeta.GetIndexesForCollection(segment.CollectionID, "")
indexIDToSegIndexes := i.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID)
for _, index := range indexes {
if _, ok := indexIDToSegIndexes[index.IndexID]; !ok {
if err := i.createIndexForSegment(ctx, segment, index.IndexID); err != nil {
log.Ctx(ctx).Warn("create index for segment fail", zap.Int64("segmentID", segment.ID),
zap.Int64("indexID", index.IndexID))
return err
}
}
}
return nil
}
func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *SegmentInfo, indexID UniqueID) error {
log.Info("create index for segment", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", indexID))
buildID, err := i.allocator.AllocID(context.Background())
if err != nil {
return err
}
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID)
indexType := GetIndexType(indexParams)
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
segSize := segment.getSegmentSize()
taskSlot := calculateIndexTaskSlot(segSize, isVectorIndex)
// rewrite the index type if needed, and this final index type will be persisted in the meta
if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() {
var err error
indexParams, err = Params.KnowhereConfig.UpdateIndexParams(indexType, paramtable.BuildStage, indexParams)
if err != nil {
return err
}
}
newIndexType := GetIndexType(indexParams)
if newIndexType != "" && newIndexType != indexType {
log.Info("override index type", zap.String("indexType", indexType), zap.String("newIndexType", newIndexType))
indexType = newIndexType
}
segIndex := &model.SegmentIndex{
SegmentID: segment.ID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
NumRows: segment.NumOfRows,
IndexID: indexID,
BuildID: buildID,
CreatedUTCTime: uint64(time.Now().Unix()),
WriteHandoff: false,
IndexType: indexType,
}
if err = i.meta.indexMeta.AddSegmentIndex(ctx, segIndex); err != nil {
return err
}
i.scheduler.Enqueue(newIndexBuildTask(model.CloneSegmentIndex(segIndex),
taskSlot,
i.meta,
i.handler,
i.storageCli,
i.indexEngineVersionManager))
log.Info("indexInspector create index for segment success",
zap.Int64("segmentID", segment.ID),
zap.Int64("indexID", indexID),
zap.Int64("segment size", segSize),
zap.Int64("task slot", taskSlot))
return nil
}
func (i *indexInspector) reloadFromMeta() {
segments := i.meta.GetAllSegmentsUnsafe()
for _, segment := range segments {
for _, segIndex := range i.meta.indexMeta.GetSegmentIndexes(segment.GetCollectionID(), segment.ID) {
if segIndex.IsDeleted || (segIndex.IndexState != commonpb.IndexState_Unissued &&
segIndex.IndexState != commonpb.IndexState_Retry &&
segIndex.IndexState != commonpb.IndexState_InProgress) {
continue
}
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, segIndex.IndexID)
indexType := GetIndexType(indexParams)
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex)
i.scheduler.Enqueue(newIndexBuildTask(
model.CloneSegmentIndex(segIndex),
taskSlot,
i.meta,
i.handler,
i.storageCli,
i.indexEngineVersionManager,
))
}
}
}