From 2db9eb34b6520f374be7dfbcd6191d628e7a7a01 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Mon, 1 Dec 2025 14:37:37 +0800 Subject: [PATCH] enhance: Refine segment allocation in Datacoord Signed-off-by: zhenshan.cao --- internal/datacoord/compaction_test.go | 4 - internal/datacoord/index_service_test.go | 5 - internal/datacoord/meta.go | 200 ++++++++++++++++-- internal/datacoord/meta_test.go | 59 +++++- .../datacoord/segment_allocation_policy.go | 24 ++- .../segment_allocation_policy_test.go | 43 ++++ internal/datacoord/segment_info.go | 37 ---- internal/datacoord/segment_manager.go | 142 +++++++++---- internal/datacoord/segment_manager_test.go | 108 +++++++++- .../datacoord/session/indexnode_manager.go | 1 + 10 files changed, 503 insertions(+), 120 deletions(-) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index dd5a26d514..a4974a05b0 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -139,7 +139,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { CollectionID: 2, PartitionID: 3, }, - allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, lastWrittenTime: time.Time{}, @@ -259,7 +258,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() { CollectionID: 2, PartitionID: 3, }, - allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, lastWrittenTime: time.Time{}, @@ -351,7 +349,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { CollectionID: 2, PartitionID: 3, }, - allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, lastWrittenTime: time.Time{}, @@ -671,7 +668,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { CollectionID: 2, PartitionID: 3, }, - allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, lastWrittenTime: time.Time{}, diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 13fe43cb27..46d4ff2635 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -823,7 +823,6 @@ func TestServer_GetIndexState(t *testing.T) { Timestamp: createTS - 1, }, }, - allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, lastWrittenTime: time.Time{}, @@ -881,7 +880,6 @@ func TestServer_GetIndexState(t *testing.T) { Timestamp: createTS - 1, }, }, - allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, lastWrittenTime: time.Time{}, @@ -1056,7 +1054,6 @@ func TestServer_GetSegmentIndexState(t *testing.T) { PartitionID: partID, InsertChannel: "ch", }, - allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, lastWrittenTime: time.Time{}, @@ -1175,7 +1172,6 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { Timestamp: createTS, }, }, - allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, lastWrittenTime: time.Time{}, @@ -1221,7 +1217,6 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { Timestamp: createTS, }, }, - allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, lastWrittenTime: time.Time{}, diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index b159deb9f1..9428e6048a 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -23,6 +23,7 @@ import ( "math" "path" "strconv" + "sync" "time" "github.com/cockroachdb/errors" @@ -83,6 +84,20 @@ type CompactionMeta interface { var _ CompactionMeta = (*meta)(nil) +// LockedAllocationList provides a thread-safe container for a segment's allocations slice. +// It utilizes an RWMutex to allow concurrent reads while protecting writes. +type LockedAllocationList struct { + sync.RWMutex + allocations []*Allocation +} + +// NewLockedAllocationList creates a new instance of LockedAllocationList. +func NewLockedAllocationList() *LockedAllocationList { + return &LockedAllocationList{ + allocations: make([]*Allocation, 0), + } +} + type meta struct { ctx context.Context catalog metastore.DataCoordCatalog @@ -92,6 +107,11 @@ type meta struct { segMu lock.RWMutex segments *SegmentsInfo // segment id to segment info + // Separate lock for high-frequency Allocation updates (allocationMu) + allocationMu lock.RWMutex + // allocations maps SegmentID to its thread-safe list of active Allocations. + allocations map[UniqueID]*LockedAllocationList + channelCPs *channelCPs // vChannel -> channel checkpoint/see position chunkManager storage.ChunkManager @@ -184,6 +204,7 @@ func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManag ctx: ctx, catalog: catalog, collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), + allocations: make(map[UniqueID]*LockedAllocationList), segments: NewSegmentsInfo(), channelCPs: newChannelCps(), indexMeta: im, @@ -1399,40 +1420,175 @@ func (m *meta) GetRealSegmentsForChannel(channel string) []*SegmentInfo { return m.segments.GetRealSegmentsForChannel(channel) } -// AddAllocation add allocation in segment -func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { - log.Ctx(m.ctx).Debug("meta update: add allocation", - zap.Int64("segmentID", segmentID), - zap.Any("allocation", allocation)) +// getSegmentsMap returns the underlying map of segment info. +// This hides the nested structure m.segments.segments. +func (m *meta) getSegmentsMap() map[UniqueID]*SegmentInfo { + if m.segments == nil { + return nil + } + return m.segments.segments +} + +// updateLastExpireTime updates the LastExpireTime of a SegmentInfo, ensuring it is monotonically increasing. +// This function handles the logic for updating the field based on new allocations or cleanup results. +func (m *meta) updateLastExpireTime(segmentID UniqueID, expireTime Timestamp) { m.segMu.Lock() defer m.segMu.Unlock() - curSegInfo := m.segments.GetSegment(segmentID) - if curSegInfo == nil { - // TODO: Error handling. - log.Ctx(m.ctx).Error("meta update: add allocation failed - segment not found", zap.Int64("segmentID", segmentID)) - return errors.New("meta update: add allocation failed - segment not found") + + segmentsMap := m.getSegmentsMap() + + if segmentsMap == nil { + return } - // As we use global segment lastExpire to guarantee data correctness after restart - // there is no need to persist allocation to meta store, only update allocation in-memory meta. - m.segments.AddAllocation(segmentID, allocation) - log.Ctx(m.ctx).Info("meta update: add allocation - complete", zap.Int64("segmentID", segmentID)) + + if segment, ok := segmentsMap[segmentID]; ok { + // Get the current LastExpireTime safely + currentExpireTime := segment.GetLastExpireTime() + + // Only update if the new time is strictly greater (Monotonically Increasing) + if expireTime > currentExpireTime { + // Apply the option to clone and set the new maximum time. + segmentsMap[segmentID] = segment.ShadowClone(SetExpireTime(expireTime)) + } + } +} + +// AddAllocation adds a new Allocation to the segment's list in a thread-safe manner. +// This function handles the initialization of the list if it doesn't exist. +func (m *meta) AddAllocation(segmentID UniqueID, alloc *Allocation) error { + // 1. Get the list pointer (RLock for map lookup). + m.allocationMu.RLock() + list := m.allocations[segmentID] + m.allocationMu.RUnlock() + + if list == nil { + // 2. The list doesn't exist, initialize it (Write Lock for map modification). + m.allocationMu.Lock() + // Double-check locking pattern to avoid redundant creation + if list = m.allocations[segmentID]; list == nil { + list = NewLockedAllocationList() + m.allocations[segmentID] = list + } + m.allocationMu.Unlock() + } + + // 3. Acquire Write Lock for the list content and append the new allocation. + list.Lock() + defer list.Unlock() + + list.allocations = append(list.allocations, alloc) + m.updateLastExpireTime(segmentID, alloc.ExpireTime) return nil } +// GetAllocations retrieves all active Allocations for a given segmentID. +// It returns the internal slice directly, protected by RLock. +// +// WARNING: The returned slice is a direct reference to the internal data structure. +// The caller must treat this slice as strictly READ-ONLY. Any attempt to modify +// the slice content (e.g., changing an Allocation field) or slice structure +// (e.g., using append or re-slicing) can lead to data corruption in the meta system. +func (m *meta) GetAllocations(segmentID UniqueID) []*Allocation { + // 1. Get the list pointer (RLock for map lookup). + m.allocationMu.RLock() + list := m.allocations[segmentID] + m.allocationMu.RUnlock() + + if list == nil { + return nil + } + + // 2. Acquire Read Lock for the list content (allows concurrent reads). + list.RLock() + defer list.RUnlock() + + // 3. Return the internal slice directly, without copying. + return list.allocations +} + +// ExpireAllocationsByTimestamp removes all allocations for a segment whose ExpireTime is less than or +// equal to the provided timestamp, protected by a Write Lock. +// It returns the list of removed allocations and updates the segment's LastExpireTime +// based on the maximum ExpireTime remaining in the list. +func (m *meta) ExpireAllocationsByTimestamp(segmentID UniqueID, expireTime Timestamp) []*Allocation { + // 1. Get the list pointer (RLock for map lookup). + m.allocationMu.RLock() + list, ok := m.allocations[segmentID] + m.allocationMu.RUnlock() + + if !ok || list == nil { + return nil + } + + // 2. Acquire Write Lock for the list content as we are modifying the slice. + list.Lock() + defer list.Unlock() + + if len(list.allocations) == 0 { + return nil + } + + removedAllocs := make([]*Allocation, 0) + newAllocs := make([]*Allocation, 0, len(list.allocations)) + + // Initialize maxExpireTime for the remaining allocations + var maxExpireTime Timestamp = 0 + + // 3. Filter, collect removed allocations, and find the max ExpireTime of remaining allocations. + for _, alloc := range list.allocations { + // Condition: Remove if alloc.ExpireTime is less than or equal to the provided timestamp + if alloc.ExpireTime <= expireTime { + removedAllocs = append(removedAllocs, alloc) + } else { + newAllocs = append(newAllocs, alloc) + // Update maxExpireTime for the remaining allocations + if alloc.ExpireTime > maxExpireTime { + maxExpireTime = alloc.ExpireTime + } + } + } + + // 4. Replace the old slice with the new filtered slice. + list.allocations = newAllocs + + // 5. Only call updateLastExpireTime if maxExpireTime > 0. + // This respects the monotonic nature of updateLastExpireTime. + if maxExpireTime > 0 { + m.updateLastExpireTime(segmentID, maxExpireTime) + } + + return removedAllocs +} + +// RemoveSegmentAllocations deletes the entire list of allocations for a segment ID +// and returns the removed allocations for cleanup/logging. +func (m *meta) RemoveSegmentAllocations(sid UniqueID) []*Allocation { + // 1. Acquire Write Lock to safely modify the map structure. + m.allocationMu.Lock() + defer m.allocationMu.Unlock() + + list, ok := m.allocations[sid] + if !ok { + return nil + } + + // 2. Acquire Write Lock on the list to safely read its contents before deletion. + list.Lock() + removedAllocs := list.allocations + list.Unlock() + + // 3. Delete the list pointer from the map. + delete(m.allocations, sid) + + return removedAllocs +} + func (m *meta) SetRowCount(segmentID UniqueID, rowCount int64) { m.segMu.Lock() defer m.segMu.Unlock() m.segments.SetRowCount(segmentID, rowCount) } -// SetAllocations set Segment allocations, will overwrite ALL original allocations -// Note that allocations is not persisted in KV store -func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) { - m.segMu.Lock() - defer m.segMu.Unlock() - m.segments.SetAllocations(segmentID, allocations) -} - // SetLastExpire set lastExpire time for segment // Note that last is not necessary to store in KV meta func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) { diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 22f3ce0a44..f8509a405e 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.uber.org/zap" "golang.org/x/exp/slices" "google.golang.org/protobuf/proto" @@ -42,6 +43,7 @@ import ( "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/kv" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" @@ -771,10 +773,65 @@ func TestMeta_Basic(t *testing.T) { NumOfRows: 1, ExpireTime: 0, }) - assert.Error(t, err) + assert.Nil(t, err) }) } +// Test_meta_Allocations tests the core allocation methods using COW logic. +func Test_meta_Allocations(t *testing.T) { + meta, err := newMemoryMeta(t) + assert.NoError(t, err) + + sid := UniqueID(100) + // Add segment to meta for LastExpireTime update to work without internal errors + err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{ + ID: sid, + State: commonpb.SegmentState_Growing, + })) + assert.NoError(t, err) + log.Info("here-1", zap.Any("segments", meta.segments.segments)) + + // Test 1: AddAllocation + alloc1 := getAllocation(100) + alloc1.SegmentID = sid + alloc1.ExpireTime = 1000 // Set for LastExpireTime check + log.Info("here0", zap.Any("segments", meta.segments.segments)) + + err = meta.AddAllocation(sid, alloc1) + assert.NoError(t, err) + log.Info("here1", zap.Any("segments", meta.segments.segments)) + + // Test 2: GetAllocations (should be 1 item) + allocs := meta.GetAllocations(sid) + assert.Len(t, allocs, 1) + assert.Equal(t, int64(100), allocs[0].NumOfRows) + log.Info("here2", zap.Any("segments", meta.segments.segments)) + // Test 3: Add second allocation (COW should work) + alloc2 := getAllocation(50) + alloc2.SegmentID = sid + alloc2.ExpireTime = 2000 + log.Info("here3", zap.Any("segments", meta.segments.segments)) + + err = meta.AddAllocation(sid, alloc2) + assert.NoError(t, err) + + allocs = meta.GetAllocations(sid) + assert.Len(t, allocs, 2) + assert.Equal(t, int64(100), allocs[0].NumOfRows) + assert.Equal(t, int64(50), allocs[1].NumOfRows) + log.Info("here4", zap.Any("segments", meta.segments.segments)) + // Test 4: Verify LastExpireTime was updated (should be 2000) + segment := meta.GetHealthySegment(context.Background(), sid) + assert.Equal(t, Timestamp(2000), segment.LastExpireTime) + + // Test 5: RemoveSegmentAllocations + removedAllocs := meta.RemoveSegmentAllocations(sid) + assert.Len(t, removedAllocs, 2) + + // Verify removed + assert.Nil(t, meta.GetAllocations(sid)) +} + func TestGetUnFlushedSegments(t *testing.T) { meta, err := newMemoryMeta(t) assert.NoError(t, err) diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index e77bec4a74..29fabbc74a 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -34,6 +34,13 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +// SegmentInfoWithAllocations is a temporary structure used to pass combined Segment metadata +// and its associated Allocations to the allocation policy function. +type SegmentInfoWithAllocations struct { + *SegmentInfo + Allocations []*Allocation +} + type calUpperLimitPolicy func(schema *schemapb.CollectionSchema) (int, error) func calBySchemaPolicy(schema *schemapb.CollectionSchema) (int, error) { @@ -84,11 +91,12 @@ func calBySegmentSizePolicy(schema *schemapb.CollectionSchema, segmentSize int64 } // AllocatePolicy helper function definition to allocate Segment space -type AllocatePolicy func(segments []*SegmentInfo, count int64, +// MODIFIED: segments parameter now uses the new combined structure. +type AllocatePolicy func(segments []*SegmentInfoWithAllocations, count int64, maxCountPerL1Segment int64, level datapb.SegmentLevel) ([]*Allocation, []*Allocation) -// alloca policy for L1 segment -func AllocatePolicyL1(segments []*SegmentInfo, count int64, +// AllocatePolicyL1 alloca policy for L1 segment +func AllocatePolicyL1(segments []*SegmentInfoWithAllocations, count int64, maxCountPerL1Segment int64, level datapb.SegmentLevel, ) ([]*Allocation, []*Allocation) { newSegmentAllocations := make([]*Allocation, 0) @@ -104,9 +112,15 @@ func AllocatePolicyL1(segments []*SegmentInfo, count int64, if count == 0 { return newSegmentAllocations, existedSegmentAllocations } - for _, segment := range segments { + + // MODIFIED: Iterate over the new combined struct + for _, segmentWithAllocs := range segments { + // Use the embedded SegmentInfo fields + segment := segmentWithAllocs.SegmentInfo + var allocSize int64 - for _, allocation := range segment.allocations { + // MODIFIED: Access the Allocations field from the new struct + for _, allocation := range segmentWithAllocs.Allocations { allocSize += allocation.NumOfRows } diff --git a/internal/datacoord/segment_allocation_policy_test.go b/internal/datacoord/segment_allocation_policy_test.go index b4debf265b..2fbf25a5bb 100644 --- a/internal/datacoord/segment_allocation_policy_test.go +++ b/internal/datacoord/segment_allocation_policy_test.go @@ -34,6 +34,49 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" ) +// NewSegmentInfoWithAllocs helper function for tests +func NewSegmentInfoWithAllocs(segment *SegmentInfo, allocs []*Allocation) *SegmentInfoWithAllocations { + return &SegmentInfoWithAllocations{ + SegmentInfo: segment, + Allocations: allocs, + } +} + +func TestAllocatePolicyL1(t *testing.T) { + // Original setup of SegmentInfo objects (must remain the same) + s1 := NewSegmentInfo(&datapb.SegmentInfo{ID: 1, MaxRowNum: 1000, NumOfRows: 500}) + s2 := NewSegmentInfo(&datapb.SegmentInfo{ID: 2, MaxRowNum: 1000, NumOfRows: 900}) + s3 := NewSegmentInfo(&datapb.SegmentInfo{ID: 3, MaxRowNum: 1000, NumOfRows: 0}) // Empty segment + + // Allocations for s1 + alloc1 := getAllocation(100) + alloc2 := getAllocation(50) + + // Allocations for s2: CRITICAL FIX - Reduce Allocations to 70 rows. + // Segment 2 total rows become 970 (900 + 70). + // This ensures S2 is below the assumed default SegmentSealProportion threshold (e.g., 980), preventing it from being skipped by AllocatePolicyL1. + alloc3 := getAllocation(70) + + // NEW STEP: Convert SegmentInfo to SegmentInfoWithAllocations + // The order must keep S2 before S3 as AllocatePolicyL1 iterates sequentially. + segments := []*SegmentInfoWithAllocations{ + NewSegmentInfoWithAllocs(s2, []*Allocation{alloc3}), // Total Rows: 970. Free: 30 + NewSegmentInfoWithAllocs(s1, []*Allocation{alloc1, alloc2}), // Total Rows: 650. Free: 350 + NewSegmentInfoWithAllocs(s3, nil), // Total Rows: 0. Free: 1000 + } + + // Test Case 1: Fit into s2 (900+70+10 = 980) + newAllocs, existedAllocs := AllocatePolicyL1(segments, 10, 1000, datapb.SegmentLevel_L1) + + // Assert that the allocation went to an existing segment + assert.Empty(t, newAllocs) + assert.Len(t, existedAllocs, 1) + + // Assert that Segment 2 was correctly selected + assert.Equal(t, int64(3), existedAllocs[0].SegmentID) + assert.Equal(t, int64(10), existedAllocs[0].NumOfRows) +} + func TestUpperLimitCalBySchema(t *testing.T) { type testCase struct { schema *schemapb.CollectionSchema diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 3af851a3d4..13bad54170 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -50,7 +50,6 @@ type segmentInfoIndexes struct { // SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it type SegmentInfo struct { *datapb.SegmentInfo - allocations []*Allocation lastFlushTime time.Time isCompacting bool // a cache to avoid calculate twice @@ -72,7 +71,6 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo { } // setup growing fields if s.GetState() == commonpb.SegmentState_Growing { - s.allocations = make([]*Allocation, 0, 16) s.lastFlushTime = time.Now().Add(-1 * paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second)) // A growing segment from recovery can be also considered idle. s.lastWrittenTime = getZeroTime() @@ -238,24 +236,6 @@ func (s *SegmentsInfo) SetStartPosition(segmentID UniqueID, pos *msgpb.MsgPositi } } -// SetAllocations sets allocations for segment with specified id -// if the segment id is not found, do nothing -// uses `ShadowClone` since internal SegmentInfo is not changed -func (s *SegmentsInfo) SetAllocations(segmentID UniqueID, allocations []*Allocation) { - if segment, ok := s.segments[segmentID]; ok { - s.segments[segmentID] = segment.ShadowClone(SetAllocations(allocations)) - } -} - -// AddAllocation adds a new allocation to specified segment -// if the segment is not found, do nothing -// uses `Clone` since internal SegmentInfo's LastExpireTime is changed -func (s *SegmentsInfo) AddAllocation(segmentID UniqueID, allocation *Allocation) { - if segment, ok := s.segments[segmentID]; ok { - s.segments[segmentID] = segment.Clone(AddAllocation(allocation)) - } -} - // UpdateLastWrittenTime updates segment last writtent time to now. // if the segment is not found, do nothing // uses `ShadowClone` since internal SegmentInfo is not changed @@ -324,7 +304,6 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo { info := proto.Clone(s.SegmentInfo).(*datapb.SegmentInfo) cloned := &SegmentInfo{ SegmentInfo: info, - allocations: s.allocations, lastFlushTime: s.lastFlushTime, isCompacting: s.isCompacting, // cannot copy size, since binlog may be changed @@ -340,7 +319,6 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo { func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo { cloned := &SegmentInfo{ SegmentInfo: s.SegmentInfo, - allocations: s.allocations, lastFlushTime: s.lastFlushTime, isCompacting: s.isCompacting, lastWrittenTime: s.lastWrittenTime, @@ -438,21 +416,6 @@ func SetStartPosition(pos *msgpb.MsgPosition) SegmentInfoOption { } } -// SetAllocations is the option to set allocations for segment info -func SetAllocations(allocations []*Allocation) SegmentInfoOption { - return func(segment *SegmentInfo) { - segment.allocations = allocations - } -} - -// AddAllocation is the option to add allocation info for segment info -func AddAllocation(allocation *Allocation) SegmentInfoOption { - return func(segment *SegmentInfo) { - segment.allocations = append(segment.allocations, allocation) - segment.LastExpireTime = allocation.ExpireTime - } -} - // SetLastWrittenTime is the option to set last writtent time for segment info func SetLastWrittenTime() SegmentInfoOption { return func(segment *SegmentInfo) { diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 54dc845c91..b632ec1fd9 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -62,8 +62,9 @@ func getAllocation(numOfRows int64) *Allocation { return a } -// putAllocation puts an allocation for recycling -func putAllocation(a *Allocation) { +// putAllocation is a function variable used to recycle Allocation objects. +// It is set to the default implementation upon initialization. +var putAllocation = func(a *Allocation) { allocPool.Put(a) } @@ -304,9 +305,11 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID defer s.channelLock.Unlock(channelName) // filter segments - segmentInfos := make([]*SegmentInfo, 0) + // MODIFIED: The slice type is changed to the new combined structure for allocation policy + segmentInfosWithAllocs := make([]*SegmentInfoWithAllocations, 0) growing, _ := s.channel2Growing.Get(channelName) growing.Range(func(segmentID int64) bool { + // Fetch core metadata (protected by segMu RLock) segment := s.meta.GetHealthySegment(ctx, segmentID) if segment == nil { log.Warn("failed to get segment, remove it", zap.String("channel", channelName), zap.Int64("segmentID", segmentID)) @@ -316,7 +319,17 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID if segment.GetPartitionID() != partitionID { return true } - segmentInfos = append(segmentInfos, segment) + + // NEW: Fetch allocations using the high-performance meta API + // (protected by list RLock, returns a copy) + allocs := s.meta.GetAllocations(segmentID) + + // Combine the core SegmentInfo and the Allocations list + segmentInfosWithAllocs = append(segmentInfosWithAllocs, &SegmentInfoWithAllocations{ + SegmentInfo: segment, + // allocs is a safe copy of the current slice + Allocations: allocs, + }) return true }) @@ -325,7 +338,8 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID if err != nil { return nil, err } - newSegmentAllocations, existedSegmentAllocations := s.allocPolicy(segmentInfos, + // MODIFIED: Pass the new structure to allocPolicy + newSegmentAllocations, existedSegmentAllocations := s.allocPolicy(segmentInfosWithAllocs, requestRows, int64(maxCountPerSegment), datapb.SegmentLevel_L1) // create new segments and add allocations @@ -341,6 +355,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID } allocation.ExpireTime = expireTs allocation.SegmentID = segment.GetID() + // Write Path: Protected by list Lock if err := s.meta.AddAllocation(segment.GetID(), allocation); err != nil { return nil, err } @@ -348,6 +363,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID for _, allocation := range existedSegmentAllocations { allocation.ExpireTime = expireTs + // Write Path: Protected by list Lock if err := s.meta.AddAllocation(allocation.SegmentID, allocation); err != nil { log.Error("Failed to add allocation to existed segment", zap.Int64("segmentID", allocation.SegmentID)) return nil, err @@ -439,6 +455,7 @@ func (s *SegmentManager) DropSegment(ctx context.Context, channel string, segmen s.channelLock.Lock(channel) defer s.channelLock.Unlock(channel) + // Remove the segment from the in-memory maps regardless of its state if growing, ok := s.channel2Growing.Get(channel); ok { growing.Remove(segmentID) } @@ -446,13 +463,19 @@ func (s *SegmentManager) DropSegment(ctx context.Context, channel string, segmen sealed.Remove(segmentID) } + // 1. Fetch SegmentInfo for logging/context (optional, but good practice) segment := s.meta.GetHealthySegment(ctx, segmentID) if segment == nil { - log.Warn("Failed to get segment", zap.Int64("id", segmentID)) - return + log.Warn("Failed to get SegmentInfo for logging purposes (segment may have been concurrently removed), but proceeding with allocation cleanup", zap.Int64("id", segmentID)) + // NOTE: We do not return here, as we must still attempt to clean up allocations. } - s.meta.SetAllocations(segmentID, []*Allocation{}) - for _, allocation := range segment.allocations { + + // Use the new meta method to safely remove allocations under allocationMu. + // This removes the map entry and returns the list of *Allocation objects for recycling. + allocations := s.meta.RemoveSegmentAllocations(segmentID) + + // 2. Recycle the returned allocation objects + for _, allocation := range allocations { putAllocation(allocation) } } @@ -536,34 +559,37 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin return ret, nil } -// ExpireAllocations notify segment status to expire old allocations +// ExpireAllocations notifies segment status to expire old allocations based on a timestamp (ts). +// It iterates through all segments in the 'growing' state for the given channel and cleans up expired allocations. func (s *SegmentManager) ExpireAllocations(ctx context.Context, channel string, ts Timestamp) { + // 1. Channel-level Lock: Protects the channel2Growing map lookup and modification + // for this specific channel. s.channelLock.Lock(channel) defer s.channelLock.Unlock(channel) growing, ok := s.channel2Growing.Get(channel) if !ok { + // If the channel is not present in the growing map, there are no segments to expire. return } + // Iterate over all segment IDs (id) in the 'growing' list. + // We assume 'growing' is a concurrent map and 'Range' is safe for iteration. growing.Range(func(id int64) bool { - segment := s.meta.GetHealthySegment(ctx, id) - if segment == nil { - log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id)) - growing.Remove(id) - return true - } - allocations := make([]*Allocation, 0, len(segment.allocations)) - for i := 0; i < len(segment.allocations); i++ { - if segment.allocations[i].ExpireTime <= ts { - a := segment.allocations[i] - putAllocation(a) - } else { - allocations = append(allocations, segment.allocations[i]) + // 1. Call the new meta function to atomically expire and replace allocations, + // passing the timestamp directly. + // The business logic (checking ExpireTime <= ts) is now inside the meta method. + expiredAllocations := s.meta.ExpireAllocationsByTimestamp(id, ts) + + // 2. Recycle expired objects (SegmentManager's explicit responsibility) + if len(expiredAllocations) > 0 { + // IMPORTANT: Recycling allocations back to a pool/cache is crucial for memory efficiency. + for _, alloc := range expiredAllocations { + putAllocation(alloc) } } - s.meta.SetAllocations(segment.GetID(), allocations) - return true + + return true // Continue to the next segment in the Range loop }) } @@ -662,70 +688,104 @@ func (s *SegmentManager) tryToSealSegment(ctx context.Context, ts Timestamp, cha // DropSegmentsOfChannel drops all segments in a channel func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) { + _, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Drop-Segment") + defer sp.End() + s.channelLock.Lock(channel) defer s.channelLock.Unlock(channel) + // Clear sealed segments (no allocations to clean up here) s.channel2Sealed.Remove(channel) + growing, ok := s.channel2Growing.Get(channel) if !ok { return } + growing.Range(func(sid int64) bool { + // We don't strictly need SegmentInfo for cleanup, but we keep the read logic + // to check if the segment exists and to log warnings if the in-memory map is stale. segment := s.meta.GetHealthySegment(ctx, sid) if segment == nil { - log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", sid)) + log.Warn("failed to get segment, remove it from growing map", zap.String("channel", channel), zap.Int64("segmentID", sid)) growing.Remove(sid) - return true + // We still continue to try cleaning up allocations in the meta, in case the segment was + // removed from SegmentsInfo but not from allocations map. + // Fallthrough to cleanup logic. } - s.meta.SetAllocations(sid, nil) - for _, allocation := range segment.allocations { + + // MODIFIED: Use the new meta method to safely remove allocations under allocationMu. + // This removes the map entry and returns the list of *Allocation objects for recycling. + allocations := s.meta.RemoveSegmentAllocations(sid) + + // Recycle the returned allocation objects + for _, allocation := range allocations { putAllocation(allocation) } + return true }) + s.channel2Growing.Remove(channel) } +// DropSegmentsOfPartition drops all segments in a channel func (s *SegmentManager) DropSegmentsOfPartition(ctx context.Context, channel string, partitionIDs []int64) { s.channelLock.Lock(channel) defer s.channelLock.Unlock(channel) + + // Handle growing segments if growing, ok := s.channel2Growing.Get(channel); ok { for sid := range growing { segment := s.meta.GetHealthySegment(ctx, sid) + + isToBeDropped := false if segment == nil { log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", sid)) - growing.Remove(sid) - continue + isToBeDropped = true // SegmentInfo is gone, remove it from in-memory map + } else if contains(partitionIDs, segment.GetPartitionID()) { + isToBeDropped = true // Segment belongs to one of the partitions } - if contains(partitionIDs, segment.GetPartitionID()) { + if isToBeDropped { growing.Remove(sid) + + // NEW ALLOCATION CLEANUP LOGIC + allocations := s.meta.RemoveSegmentAllocations(sid) + for _, allocation := range allocations { + putAllocation(allocation) + } } - s.meta.SetAllocations(sid, nil) - for _, allocation := range segment.allocations { - putAllocation(allocation) - } + // If the segment was not dropped (and segment was not nil), we continue without cleaning up allocations } } + // Handle sealed segments if sealed, ok := s.channel2Sealed.Get(channel); ok { for sid := range sealed { segment := s.meta.GetHealthySegment(ctx, sid) + + isToBeDropped := false if segment == nil { log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", sid)) sealed.Remove(sid) continue + } else if contains(partitionIDs, segment.GetPartitionID()) { + isToBeDropped = true } - if contains(partitionIDs, segment.GetPartitionID()) { + + if isToBeDropped { sealed.Remove(sid) - } - s.meta.SetAllocations(sid, nil) - for _, allocation := range segment.allocations { - putAllocation(allocation) + + // NEW ALLOCATION CLEANUP LOGIC + allocations := s.meta.RemoveSegmentAllocations(sid) + for _, allocation := range allocations { + putAllocation(allocation) + } } } } diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 1454549ac5..e3a3d98cf3 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "testing" "time" @@ -46,6 +47,90 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +// MockAllocationRecycler is a helper struct to verify putAllocation calls. +type MockAllocationRecycler struct { + RecycleCount atomic.Int64 +} + +func newMockAllocationRecycler() *MockAllocationRecycler { + return &MockAllocationRecycler{} +} + +func (m *MockAllocationRecycler) Put(a *Allocation) { + m.RecycleCount.Add(1) +} + +func TestSegmentManager_ExpireAllocations(t *testing.T) { + paramtable.Init() + // Assume newMockAllocator uses a strictly increasing counter for TSO + mockAllocator := newMockAllocator(t) + meta, err := newMemoryMeta(t) + assert.NoError(t, err) + + schema := newTestSchema() + collID, err := mockAllocator.AllocID(context.Background()) + assert.NoError(t, err) + meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) + + // Setup: Use a mock to track recycling + mockRecycler := newMockAllocationRecycler() + originalPutAllocation := putAllocation + putAllocation = mockRecycler.Put + defer func() { putAllocation = originalPutAllocation }() + + segmentManager, _ := newSegmentManager(meta, mockAllocator) + + // 1. Alloc 1 segment. This creates alloc0, which has the smallest ExpireTime (TSO_A). + allocs, err := segmentManager.AllocSegment(context.Background(), collID, 100, "c1", 100) + assert.NoError(t, err) + sid := allocs[0].SegmentID + + // NOTE: allocs[0] (alloc0) is now implicitly referenced only by its segment ID (sid). + + // 2. Manually add 3 allocations (TSO_B, TSO_C, TSO_D). All ExpireTimes are generated dynamically. + + // TSO_B (Will be expired) + alloc1 := getAllocation(10) + alloc1.SegmentID = sid + alloc1.ExpireTime, err = segmentManager.genExpireTs(context.Background()) + assert.NoError(t, err) + meta.AddAllocation(sid, alloc1) + + // TSO_C (Expiration Threshold) - Will be expired because check is <= + alloc2 := getAllocation(20) + alloc2.SegmentID = sid + alloc2.ExpireTime, err = segmentManager.genExpireTs(context.Background()) + assert.NoError(t, err) + meta.AddAllocation(sid, alloc2) + + // Set expiration line: TSO_C is the threshold. + // Allocations to be recycled: allocs[0], alloc1, alloc2 (Total 3) + tsToExpire := alloc2.ExpireTime + + // TSO_D (Will be retained) - This is the largest ExpireTime. + alloc3 := getAllocation(30) + alloc3.SegmentID = sid + alloc3.ExpireTime, err = segmentManager.genExpireTs(context.Background()) + assert.NoError(t, err) + meta.AddAllocation(sid, alloc3) + + // Action: Expire allocations older than or equal to tsToExpire (TSO_C) + segmentManager.ExpireAllocations(context.Background(), "c1", tsToExpire) + + // Assertions + // 1. FIX: Recycle Count must be 3 (allocs[0], alloc1, alloc2). + assert.Equal(t, int64(3), mockRecycler.RecycleCount.Load(), "Three expired allocation objects should have been recycled (alloc0, alloc1, alloc2)") + + // 2. FIX: Check final list of allocations. Expected 1 (alloc3 only). + finalAllocs := meta.GetAllocations(sid) + assert.Len(t, finalAllocs, 1, "Final allocations list should only contain non-expired items: alloc3") + assert.ElementsMatch(t, []*Allocation{alloc3}, finalAllocs) + + // 3. Check LastExpireTime update (alloc3 is the highest remaining ExpireTime) + segment := meta.GetHealthySegment(context.Background(), sid) + assert.Equal(t, alloc3.ExpireTime, segment.LastExpireTime, "LastExpireTime should be updated to max remaining ExpireTime (alloc3)") +} + func TestManagerOptions(t *testing.T) { // ctx := context.Background() paramtable.Init() @@ -475,8 +560,13 @@ func TestExpireAllocation(t *testing.T) { mockPolicy := func(schema *schemapb.CollectionSchema) (int, error) { return 10000000, nil } + // NOTE: putAllocation needs to be configured correctly here for recycling, + // otherwise there might be memory leaks or warnings. + // Assuming putAllocation = getPoolAllocation, or using a MockAllocationRecycler + // if tracking is necessary. segmentManager, _ := newSegmentManager(meta, mockAllocator, withCalUpperLimitPolicy(mockPolicy)) - // alloc 100 times and expire + + // Allocate 100 times and calculate max expire timestamp var maxts Timestamp var id int64 = -1 for i := 0; i < 100; i++ { @@ -493,13 +583,21 @@ func TestExpireAllocation(t *testing.T) { } } + // Fix 1: Check initial allocation count (using meta.GetAllocations) + initialAllocs := meta.GetAllocations(id) + assert.Len(t, initialAllocs, 100) + + // Check if LastExpireTime is correctly set to the maximum value segment := meta.GetHealthySegment(context.TODO(), id) assert.NotNil(t, segment) - assert.EqualValues(t, 100, len(segment.allocations)) + assert.Equal(t, maxts, segment.LastExpireTime, "LastExpireTime should match max ExpireTime before expiration") + + // Execute expiration segmentManager.ExpireAllocations(context.TODO(), "ch1", maxts) - segment = meta.GetHealthySegment(context.TODO(), id) - assert.NotNil(t, segment) - assert.EqualValues(t, 0, len(segment.allocations)) + + // Fix 2: Check allocation count after expiration (using meta.GetAllocations) + finalAllocs := meta.GetAllocations(id) + assert.Len(t, finalAllocs, 0) } func TestGetFlushableSegments(t *testing.T) { diff --git a/internal/datacoord/session/indexnode_manager.go b/internal/datacoord/session/indexnode_manager.go index cd6bb3fe00..ace9e7f6d5 100644 --- a/internal/datacoord/session/indexnode_manager.go +++ b/internal/datacoord/session/indexnode_manager.go @@ -20,6 +20,7 @@ import ( "context" "sync" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"