diff --git a/internal/datacoord/index_inspector.go b/internal/datacoord/index_inspector.go index 2870b96d8e..49335825c9 100644 --- a/internal/datacoord/index_inspector.go +++ b/internal/datacoord/index_inspector.go @@ -179,8 +179,9 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID) indexType := GetIndexType(indexParams) isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) - segSize := segment.getSegmentSize() - taskSlot := calculateIndexTaskSlot(segSize, isVectorIndex) + fieldID := i.meta.indexMeta.GetFieldIDByIndexID(segment.CollectionID, indexID) + fieldSize := segment.getFieldBinlogSize(fieldID) + taskSlot := calculateIndexTaskSlot(fieldSize, isVectorIndex) // rewrite the index type if needed, and this final index type will be persisted in the meta if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() { @@ -219,7 +220,9 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg log.Info("indexInspector create index for segment success", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", indexID), - zap.Int64("segment size", segSize), + zap.Int64("fieldID", fieldID), + zap.Int64("segment size", segment.getSegmentSize()), + zap.Int64("field size", fieldSize), zap.Int64("task slot", taskSlot)) return nil } @@ -237,7 +240,9 @@ func (i *indexInspector) reloadFromMeta() { indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, segIndex.IndexID) indexType := GetIndexType(indexParams) isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) - taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex) + fieldID := i.meta.indexMeta.GetFieldIDByIndexID(segment.CollectionID, segIndex.IndexID) + fieldSize := segment.getFieldBinlogSize(fieldID) + taskSlot := calculateIndexTaskSlot(fieldSize, isVectorIndex) i.scheduler.Enqueue(newIndexBuildTask( model.CloneSegmentIndex(segIndex), diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 516c726fd5..561cb37e8e 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -532,6 +532,29 @@ func (s *SegmentInfo) getSegmentSize() int64 { return s.size.Load() } +func (s *SegmentInfo) getFieldBinlogSize(fieldID int64) int64 { + var size int64 + for _, binlogs := range s.GetBinlogs() { + if binlogs.GetFieldID() == fieldID { + for _, l := range binlogs.GetBinlogs() { + size += l.GetMemorySize() + } + } else { + for _, childFieldID := range binlogs.GetChildFields() { + if childFieldID == fieldID { + for _, l := range binlogs.GetBinlogs() { + size += l.GetMemorySize() + } + } + } + } + } + if size <= 0 { + return s.getSegmentSize() + } + return size +} + // Any edits on deltalogs of flushed segments will reset deltaRowcount to -1 func (s *SegmentInfo) getDeltaCount() int64 { if s.deltaRowcount.Load() < 0 || s.GetState() != commonpb.SegmentState_Flushed { diff --git a/internal/datacoord/segment_info_test.go b/internal/datacoord/segment_info_test.go index cd42ec092e..887bb0c67f 100644 --- a/internal/datacoord/segment_info_test.go +++ b/internal/datacoord/segment_info_test.go @@ -110,6 +110,7 @@ func TestGetSegmentSize(t *testing.T) { SegmentInfo: &datapb.SegmentInfo{ Binlogs: []*datapb.FieldBinlog{ { + FieldID: 1, Binlogs: []*datapb.Binlog{ { LogID: 1, @@ -143,6 +144,9 @@ func TestGetSegmentSize(t *testing.T) { assert.Equal(t, int64(3), segment.getSegmentSize()) assert.Equal(t, int64(3), segment.getSegmentSize()) + assert.Equal(t, int64(1), segment.getFieldBinlogSize(1)) + // field 2 has no binlogs, fallback to getSegmentSize + assert.Equal(t, int64(3), segment.getFieldBinlogSize(2)) } func TestIsDeltaLogExists(t *testing.T) { diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 4a9bf35898..383f711712 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -373,17 +373,17 @@ func getSortStatus(sorted bool) string { return "unsorted" } -func calculateIndexTaskSlot(segmentSize int64, isVectorIndex bool) int64 { +func calculateIndexTaskSlot(fieldSize int64, isVectorIndex bool) int64 { defaultSlots := Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64() if !isVectorIndex { defaultSlots = Params.DataCoordCfg.ScalarIndexTaskSlotUsage.GetAsInt64() } - if segmentSize > 512*1024*1024 { - taskSlot := max(segmentSize/512/1024/1024, 1) * defaultSlots + if fieldSize > 512*1024*1024 { + taskSlot := max(fieldSize/512/1024/1024, 1) * defaultSlots return max(taskSlot, 1) - } else if segmentSize > 100*1024*1024 { + } else if fieldSize > 100*1024*1024 { return max(defaultSlots/4, 1) - } else if segmentSize > 10*1024*1024 { + } else if fieldSize > 10*1024*1024 { return max(defaultSlots/16, 1) } return max(defaultSlots/64, 1)