enhance: Refine segment allocation in Datacoord

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2025-12-01 14:37:37 +08:00
parent 1b0495f10a
commit 2db9eb34b6
10 changed files with 503 additions and 120 deletions

View File

@ -139,7 +139,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
CollectionID: 2,
PartitionID: 3,
},
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
@ -259,7 +258,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() {
CollectionID: 2,
PartitionID: 3,
},
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
@ -351,7 +349,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
CollectionID: 2,
PartitionID: 3,
},
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
@ -671,7 +668,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
CollectionID: 2,
PartitionID: 3,
},
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},

View File

@ -823,7 +823,6 @@ func TestServer_GetIndexState(t *testing.T) {
Timestamp: createTS - 1,
},
},
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
@ -881,7 +880,6 @@ func TestServer_GetIndexState(t *testing.T) {
Timestamp: createTS - 1,
},
},
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
@ -1056,7 +1054,6 @@ func TestServer_GetSegmentIndexState(t *testing.T) {
PartitionID: partID,
InsertChannel: "ch",
},
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
@ -1175,7 +1172,6 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
Timestamp: createTS,
},
},
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
@ -1221,7 +1217,6 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
Timestamp: createTS,
},
},
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},

View File

@ -23,6 +23,7 @@ import (
"math"
"path"
"strconv"
"sync"
"time"
"github.com/cockroachdb/errors"
@ -83,6 +84,20 @@ type CompactionMeta interface {
var _ CompactionMeta = (*meta)(nil)
// LockedAllocationList provides a thread-safe container for a segment's allocations slice.
// It utilizes an RWMutex to allow concurrent reads while protecting writes.
type LockedAllocationList struct {
sync.RWMutex
allocations []*Allocation
}
// NewLockedAllocationList creates a new instance of LockedAllocationList.
func NewLockedAllocationList() *LockedAllocationList {
return &LockedAllocationList{
allocations: make([]*Allocation, 0),
}
}
type meta struct {
ctx context.Context
catalog metastore.DataCoordCatalog
@ -92,6 +107,11 @@ type meta struct {
segMu lock.RWMutex
segments *SegmentsInfo // segment id to segment info
// Separate lock for high-frequency Allocation updates (allocationMu)
allocationMu lock.RWMutex
// allocations maps SegmentID to its thread-safe list of active Allocations.
allocations map[UniqueID]*LockedAllocationList
channelCPs *channelCPs // vChannel -> channel checkpoint/see position
chunkManager storage.ChunkManager
@ -184,6 +204,7 @@ func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManag
ctx: ctx,
catalog: catalog,
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
allocations: make(map[UniqueID]*LockedAllocationList),
segments: NewSegmentsInfo(),
channelCPs: newChannelCps(),
indexMeta: im,
@ -1399,40 +1420,175 @@ func (m *meta) GetRealSegmentsForChannel(channel string) []*SegmentInfo {
return m.segments.GetRealSegmentsForChannel(channel)
}
// AddAllocation add allocation in segment
func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
log.Ctx(m.ctx).Debug("meta update: add allocation",
zap.Int64("segmentID", segmentID),
zap.Any("allocation", allocation))
// getSegmentsMap returns the underlying map of segment info.
// This hides the nested structure m.segments.segments.
func (m *meta) getSegmentsMap() map[UniqueID]*SegmentInfo {
if m.segments == nil {
return nil
}
return m.segments.segments
}
// updateLastExpireTime updates the LastExpireTime of a SegmentInfo, ensuring it is monotonically increasing.
// This function handles the logic for updating the field based on new allocations or cleanup results.
func (m *meta) updateLastExpireTime(segmentID UniqueID, expireTime Timestamp) {
m.segMu.Lock()
defer m.segMu.Unlock()
curSegInfo := m.segments.GetSegment(segmentID)
if curSegInfo == nil {
// TODO: Error handling.
log.Ctx(m.ctx).Error("meta update: add allocation failed - segment not found", zap.Int64("segmentID", segmentID))
return errors.New("meta update: add allocation failed - segment not found")
segmentsMap := m.getSegmentsMap()
if segmentsMap == nil {
return
}
// As we use global segment lastExpire to guarantee data correctness after restart
// there is no need to persist allocation to meta store, only update allocation in-memory meta.
m.segments.AddAllocation(segmentID, allocation)
log.Ctx(m.ctx).Info("meta update: add allocation - complete", zap.Int64("segmentID", segmentID))
if segment, ok := segmentsMap[segmentID]; ok {
// Get the current LastExpireTime safely
currentExpireTime := segment.GetLastExpireTime()
// Only update if the new time is strictly greater (Monotonically Increasing)
if expireTime > currentExpireTime {
// Apply the option to clone and set the new maximum time.
segmentsMap[segmentID] = segment.ShadowClone(SetExpireTime(expireTime))
}
}
}
// AddAllocation adds a new Allocation to the segment's list in a thread-safe manner.
// This function handles the initialization of the list if it doesn't exist.
func (m *meta) AddAllocation(segmentID UniqueID, alloc *Allocation) error {
// 1. Get the list pointer (RLock for map lookup).
m.allocationMu.RLock()
list := m.allocations[segmentID]
m.allocationMu.RUnlock()
if list == nil {
// 2. The list doesn't exist, initialize it (Write Lock for map modification).
m.allocationMu.Lock()
// Double-check locking pattern to avoid redundant creation
if list = m.allocations[segmentID]; list == nil {
list = NewLockedAllocationList()
m.allocations[segmentID] = list
}
m.allocationMu.Unlock()
}
// 3. Acquire Write Lock for the list content and append the new allocation.
list.Lock()
defer list.Unlock()
list.allocations = append(list.allocations, alloc)
m.updateLastExpireTime(segmentID, alloc.ExpireTime)
return nil
}
// GetAllocations retrieves all active Allocations for a given segmentID.
// It returns the internal slice directly, protected by RLock.
//
// WARNING: The returned slice is a direct reference to the internal data structure.
// The caller must treat this slice as strictly READ-ONLY. Any attempt to modify
// the slice content (e.g., changing an Allocation field) or slice structure
// (e.g., using append or re-slicing) can lead to data corruption in the meta system.
func (m *meta) GetAllocations(segmentID UniqueID) []*Allocation {
// 1. Get the list pointer (RLock for map lookup).
m.allocationMu.RLock()
list := m.allocations[segmentID]
m.allocationMu.RUnlock()
if list == nil {
return nil
}
// 2. Acquire Read Lock for the list content (allows concurrent reads).
list.RLock()
defer list.RUnlock()
// 3. Return the internal slice directly, without copying.
return list.allocations
}
// ExpireAllocationsByTimestamp removes all allocations for a segment whose ExpireTime is less than or
// equal to the provided timestamp, protected by a Write Lock.
// It returns the list of removed allocations and updates the segment's LastExpireTime
// based on the maximum ExpireTime remaining in the list.
func (m *meta) ExpireAllocationsByTimestamp(segmentID UniqueID, expireTime Timestamp) []*Allocation {
// 1. Get the list pointer (RLock for map lookup).
m.allocationMu.RLock()
list, ok := m.allocations[segmentID]
m.allocationMu.RUnlock()
if !ok || list == nil {
return nil
}
// 2. Acquire Write Lock for the list content as we are modifying the slice.
list.Lock()
defer list.Unlock()
if len(list.allocations) == 0 {
return nil
}
removedAllocs := make([]*Allocation, 0)
newAllocs := make([]*Allocation, 0, len(list.allocations))
// Initialize maxExpireTime for the remaining allocations
var maxExpireTime Timestamp = 0
// 3. Filter, collect removed allocations, and find the max ExpireTime of remaining allocations.
for _, alloc := range list.allocations {
// Condition: Remove if alloc.ExpireTime is less than or equal to the provided timestamp
if alloc.ExpireTime <= expireTime {
removedAllocs = append(removedAllocs, alloc)
} else {
newAllocs = append(newAllocs, alloc)
// Update maxExpireTime for the remaining allocations
if alloc.ExpireTime > maxExpireTime {
maxExpireTime = alloc.ExpireTime
}
}
}
// 4. Replace the old slice with the new filtered slice.
list.allocations = newAllocs
// 5. Only call updateLastExpireTime if maxExpireTime > 0.
// This respects the monotonic nature of updateLastExpireTime.
if maxExpireTime > 0 {
m.updateLastExpireTime(segmentID, maxExpireTime)
}
return removedAllocs
}
// RemoveSegmentAllocations deletes the entire list of allocations for a segment ID
// and returns the removed allocations for cleanup/logging.
func (m *meta) RemoveSegmentAllocations(sid UniqueID) []*Allocation {
// 1. Acquire Write Lock to safely modify the map structure.
m.allocationMu.Lock()
defer m.allocationMu.Unlock()
list, ok := m.allocations[sid]
if !ok {
return nil
}
// 2. Acquire Write Lock on the list to safely read its contents before deletion.
list.Lock()
removedAllocs := list.allocations
list.Unlock()
// 3. Delete the list pointer from the map.
delete(m.allocations, sid)
return removedAllocs
}
func (m *meta) SetRowCount(segmentID UniqueID, rowCount int64) {
m.segMu.Lock()
defer m.segMu.Unlock()
m.segments.SetRowCount(segmentID, rowCount)
}
// SetAllocations set Segment allocations, will overwrite ALL original allocations
// Note that allocations is not persisted in KV store
func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
m.segMu.Lock()
defer m.segMu.Unlock()
m.segments.SetAllocations(segmentID, allocations)
}
// SetLastExpire set lastExpire time for segment
// Note that last is not necessary to store in KV meta
func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) {

View File

@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"
@ -42,6 +43,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
@ -771,10 +773,65 @@ func TestMeta_Basic(t *testing.T) {
NumOfRows: 1,
ExpireTime: 0,
})
assert.Error(t, err)
assert.Nil(t, err)
})
}
// Test_meta_Allocations tests the core allocation methods using COW logic.
func Test_meta_Allocations(t *testing.T) {
meta, err := newMemoryMeta(t)
assert.NoError(t, err)
sid := UniqueID(100)
// Add segment to meta for LastExpireTime update to work without internal errors
err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{
ID: sid,
State: commonpb.SegmentState_Growing,
}))
assert.NoError(t, err)
log.Info("here-1", zap.Any("segments", meta.segments.segments))
// Test 1: AddAllocation
alloc1 := getAllocation(100)
alloc1.SegmentID = sid
alloc1.ExpireTime = 1000 // Set for LastExpireTime check
log.Info("here0", zap.Any("segments", meta.segments.segments))
err = meta.AddAllocation(sid, alloc1)
assert.NoError(t, err)
log.Info("here1", zap.Any("segments", meta.segments.segments))
// Test 2: GetAllocations (should be 1 item)
allocs := meta.GetAllocations(sid)
assert.Len(t, allocs, 1)
assert.Equal(t, int64(100), allocs[0].NumOfRows)
log.Info("here2", zap.Any("segments", meta.segments.segments))
// Test 3: Add second allocation (COW should work)
alloc2 := getAllocation(50)
alloc2.SegmentID = sid
alloc2.ExpireTime = 2000
log.Info("here3", zap.Any("segments", meta.segments.segments))
err = meta.AddAllocation(sid, alloc2)
assert.NoError(t, err)
allocs = meta.GetAllocations(sid)
assert.Len(t, allocs, 2)
assert.Equal(t, int64(100), allocs[0].NumOfRows)
assert.Equal(t, int64(50), allocs[1].NumOfRows)
log.Info("here4", zap.Any("segments", meta.segments.segments))
// Test 4: Verify LastExpireTime was updated (should be 2000)
segment := meta.GetHealthySegment(context.Background(), sid)
assert.Equal(t, Timestamp(2000), segment.LastExpireTime)
// Test 5: RemoveSegmentAllocations
removedAllocs := meta.RemoveSegmentAllocations(sid)
assert.Len(t, removedAllocs, 2)
// Verify removed
assert.Nil(t, meta.GetAllocations(sid))
}
func TestGetUnFlushedSegments(t *testing.T) {
meta, err := newMemoryMeta(t)
assert.NoError(t, err)

View File

@ -34,6 +34,13 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
// SegmentInfoWithAllocations is a temporary structure used to pass combined Segment metadata
// and its associated Allocations to the allocation policy function.
type SegmentInfoWithAllocations struct {
*SegmentInfo
Allocations []*Allocation
}
type calUpperLimitPolicy func(schema *schemapb.CollectionSchema) (int, error)
func calBySchemaPolicy(schema *schemapb.CollectionSchema) (int, error) {
@ -84,11 +91,12 @@ func calBySegmentSizePolicy(schema *schemapb.CollectionSchema, segmentSize int64
}
// AllocatePolicy helper function definition to allocate Segment space
type AllocatePolicy func(segments []*SegmentInfo, count int64,
// MODIFIED: segments parameter now uses the new combined structure.
type AllocatePolicy func(segments []*SegmentInfoWithAllocations, count int64,
maxCountPerL1Segment int64, level datapb.SegmentLevel) ([]*Allocation, []*Allocation)
// alloca policy for L1 segment
func AllocatePolicyL1(segments []*SegmentInfo, count int64,
// AllocatePolicyL1 alloca policy for L1 segment
func AllocatePolicyL1(segments []*SegmentInfoWithAllocations, count int64,
maxCountPerL1Segment int64, level datapb.SegmentLevel,
) ([]*Allocation, []*Allocation) {
newSegmentAllocations := make([]*Allocation, 0)
@ -104,9 +112,15 @@ func AllocatePolicyL1(segments []*SegmentInfo, count int64,
if count == 0 {
return newSegmentAllocations, existedSegmentAllocations
}
for _, segment := range segments {
// MODIFIED: Iterate over the new combined struct
for _, segmentWithAllocs := range segments {
// Use the embedded SegmentInfo fields
segment := segmentWithAllocs.SegmentInfo
var allocSize int64
for _, allocation := range segment.allocations {
// MODIFIED: Access the Allocations field from the new struct
for _, allocation := range segmentWithAllocs.Allocations {
allocSize += allocation.NumOfRows
}

View File

@ -34,6 +34,49 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
)
// NewSegmentInfoWithAllocs helper function for tests
func NewSegmentInfoWithAllocs(segment *SegmentInfo, allocs []*Allocation) *SegmentInfoWithAllocations {
return &SegmentInfoWithAllocations{
SegmentInfo: segment,
Allocations: allocs,
}
}
func TestAllocatePolicyL1(t *testing.T) {
// Original setup of SegmentInfo objects (must remain the same)
s1 := NewSegmentInfo(&datapb.SegmentInfo{ID: 1, MaxRowNum: 1000, NumOfRows: 500})
s2 := NewSegmentInfo(&datapb.SegmentInfo{ID: 2, MaxRowNum: 1000, NumOfRows: 900})
s3 := NewSegmentInfo(&datapb.SegmentInfo{ID: 3, MaxRowNum: 1000, NumOfRows: 0}) // Empty segment
// Allocations for s1
alloc1 := getAllocation(100)
alloc2 := getAllocation(50)
// Allocations for s2: CRITICAL FIX - Reduce Allocations to 70 rows.
// Segment 2 total rows become 970 (900 + 70).
// This ensures S2 is below the assumed default SegmentSealProportion threshold (e.g., 980), preventing it from being skipped by AllocatePolicyL1.
alloc3 := getAllocation(70)
// NEW STEP: Convert SegmentInfo to SegmentInfoWithAllocations
// The order must keep S2 before S3 as AllocatePolicyL1 iterates sequentially.
segments := []*SegmentInfoWithAllocations{
NewSegmentInfoWithAllocs(s2, []*Allocation{alloc3}), // Total Rows: 970. Free: 30
NewSegmentInfoWithAllocs(s1, []*Allocation{alloc1, alloc2}), // Total Rows: 650. Free: 350
NewSegmentInfoWithAllocs(s3, nil), // Total Rows: 0. Free: 1000
}
// Test Case 1: Fit into s2 (900+70+10 = 980)
newAllocs, existedAllocs := AllocatePolicyL1(segments, 10, 1000, datapb.SegmentLevel_L1)
// Assert that the allocation went to an existing segment
assert.Empty(t, newAllocs)
assert.Len(t, existedAllocs, 1)
// Assert that Segment 2 was correctly selected
assert.Equal(t, int64(3), existedAllocs[0].SegmentID)
assert.Equal(t, int64(10), existedAllocs[0].NumOfRows)
}
func TestUpperLimitCalBySchema(t *testing.T) {
type testCase struct {
schema *schemapb.CollectionSchema

View File

@ -50,7 +50,6 @@ type segmentInfoIndexes struct {
// SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it
type SegmentInfo struct {
*datapb.SegmentInfo
allocations []*Allocation
lastFlushTime time.Time
isCompacting bool
// a cache to avoid calculate twice
@ -72,7 +71,6 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
}
// setup growing fields
if s.GetState() == commonpb.SegmentState_Growing {
s.allocations = make([]*Allocation, 0, 16)
s.lastFlushTime = time.Now().Add(-1 * paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second))
// A growing segment from recovery can be also considered idle.
s.lastWrittenTime = getZeroTime()
@ -238,24 +236,6 @@ func (s *SegmentsInfo) SetStartPosition(segmentID UniqueID, pos *msgpb.MsgPositi
}
}
// SetAllocations sets allocations for segment with specified id
// if the segment id is not found, do nothing
// uses `ShadowClone` since internal SegmentInfo is not changed
func (s *SegmentsInfo) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = segment.ShadowClone(SetAllocations(allocations))
}
}
// AddAllocation adds a new allocation to specified segment
// if the segment is not found, do nothing
// uses `Clone` since internal SegmentInfo's LastExpireTime is changed
func (s *SegmentsInfo) AddAllocation(segmentID UniqueID, allocation *Allocation) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = segment.Clone(AddAllocation(allocation))
}
}
// UpdateLastWrittenTime updates segment last writtent time to now.
// if the segment is not found, do nothing
// uses `ShadowClone` since internal SegmentInfo is not changed
@ -324,7 +304,6 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
info := proto.Clone(s.SegmentInfo).(*datapb.SegmentInfo)
cloned := &SegmentInfo{
SegmentInfo: info,
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
// cannot copy size, since binlog may be changed
@ -340,7 +319,6 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
cloned := &SegmentInfo{
SegmentInfo: s.SegmentInfo,
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
lastWrittenTime: s.lastWrittenTime,
@ -438,21 +416,6 @@ func SetStartPosition(pos *msgpb.MsgPosition) SegmentInfoOption {
}
}
// SetAllocations is the option to set allocations for segment info
func SetAllocations(allocations []*Allocation) SegmentInfoOption {
return func(segment *SegmentInfo) {
segment.allocations = allocations
}
}
// AddAllocation is the option to add allocation info for segment info
func AddAllocation(allocation *Allocation) SegmentInfoOption {
return func(segment *SegmentInfo) {
segment.allocations = append(segment.allocations, allocation)
segment.LastExpireTime = allocation.ExpireTime
}
}
// SetLastWrittenTime is the option to set last writtent time for segment info
func SetLastWrittenTime() SegmentInfoOption {
return func(segment *SegmentInfo) {

View File

@ -62,8 +62,9 @@ func getAllocation(numOfRows int64) *Allocation {
return a
}
// putAllocation puts an allocation for recycling
func putAllocation(a *Allocation) {
// putAllocation is a function variable used to recycle Allocation objects.
// It is set to the default implementation upon initialization.
var putAllocation = func(a *Allocation) {
allocPool.Put(a)
}
@ -304,9 +305,11 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
defer s.channelLock.Unlock(channelName)
// filter segments
segmentInfos := make([]*SegmentInfo, 0)
// MODIFIED: The slice type is changed to the new combined structure for allocation policy
segmentInfosWithAllocs := make([]*SegmentInfoWithAllocations, 0)
growing, _ := s.channel2Growing.Get(channelName)
growing.Range(func(segmentID int64) bool {
// Fetch core metadata (protected by segMu RLock)
segment := s.meta.GetHealthySegment(ctx, segmentID)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channelName), zap.Int64("segmentID", segmentID))
@ -316,7 +319,17 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
if segment.GetPartitionID() != partitionID {
return true
}
segmentInfos = append(segmentInfos, segment)
// NEW: Fetch allocations using the high-performance meta API
// (protected by list RLock, returns a copy)
allocs := s.meta.GetAllocations(segmentID)
// Combine the core SegmentInfo and the Allocations list
segmentInfosWithAllocs = append(segmentInfosWithAllocs, &SegmentInfoWithAllocations{
SegmentInfo: segment,
// allocs is a safe copy of the current slice
Allocations: allocs,
})
return true
})
@ -325,7 +338,8 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
if err != nil {
return nil, err
}
newSegmentAllocations, existedSegmentAllocations := s.allocPolicy(segmentInfos,
// MODIFIED: Pass the new structure to allocPolicy
newSegmentAllocations, existedSegmentAllocations := s.allocPolicy(segmentInfosWithAllocs,
requestRows, int64(maxCountPerSegment), datapb.SegmentLevel_L1)
// create new segments and add allocations
@ -341,6 +355,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
}
allocation.ExpireTime = expireTs
allocation.SegmentID = segment.GetID()
// Write Path: Protected by list Lock
if err := s.meta.AddAllocation(segment.GetID(), allocation); err != nil {
return nil, err
}
@ -348,6 +363,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
for _, allocation := range existedSegmentAllocations {
allocation.ExpireTime = expireTs
// Write Path: Protected by list Lock
if err := s.meta.AddAllocation(allocation.SegmentID, allocation); err != nil {
log.Error("Failed to add allocation to existed segment", zap.Int64("segmentID", allocation.SegmentID))
return nil, err
@ -439,6 +455,7 @@ func (s *SegmentManager) DropSegment(ctx context.Context, channel string, segmen
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
// Remove the segment from the in-memory maps regardless of its state
if growing, ok := s.channel2Growing.Get(channel); ok {
growing.Remove(segmentID)
}
@ -446,13 +463,19 @@ func (s *SegmentManager) DropSegment(ctx context.Context, channel string, segmen
sealed.Remove(segmentID)
}
// 1. Fetch SegmentInfo for logging/context (optional, but good practice)
segment := s.meta.GetHealthySegment(ctx, segmentID)
if segment == nil {
log.Warn("Failed to get segment", zap.Int64("id", segmentID))
return
log.Warn("Failed to get SegmentInfo for logging purposes (segment may have been concurrently removed), but proceeding with allocation cleanup", zap.Int64("id", segmentID))
// NOTE: We do not return here, as we must still attempt to clean up allocations.
}
s.meta.SetAllocations(segmentID, []*Allocation{})
for _, allocation := range segment.allocations {
// Use the new meta method to safely remove allocations under allocationMu.
// This removes the map entry and returns the list of *Allocation objects for recycling.
allocations := s.meta.RemoveSegmentAllocations(segmentID)
// 2. Recycle the returned allocation objects
for _, allocation := range allocations {
putAllocation(allocation)
}
}
@ -536,34 +559,37 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
return ret, nil
}
// ExpireAllocations notify segment status to expire old allocations
// ExpireAllocations notifies segment status to expire old allocations based on a timestamp (ts).
// It iterates through all segments in the 'growing' state for the given channel and cleans up expired allocations.
func (s *SegmentManager) ExpireAllocations(ctx context.Context, channel string, ts Timestamp) {
// 1. Channel-level Lock: Protects the channel2Growing map lookup and modification
// for this specific channel.
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
growing, ok := s.channel2Growing.Get(channel)
if !ok {
// If the channel is not present in the growing map, there are no segments to expire.
return
}
// Iterate over all segment IDs (id) in the 'growing' list.
// We assume 'growing' is a concurrent map and 'Range' is safe for iteration.
growing.Range(func(id int64) bool {
segment := s.meta.GetHealthySegment(ctx, id)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id))
growing.Remove(id)
return true
}
allocations := make([]*Allocation, 0, len(segment.allocations))
for i := 0; i < len(segment.allocations); i++ {
if segment.allocations[i].ExpireTime <= ts {
a := segment.allocations[i]
putAllocation(a)
} else {
allocations = append(allocations, segment.allocations[i])
// 1. Call the new meta function to atomically expire and replace allocations,
// passing the timestamp directly.
// The business logic (checking ExpireTime <= ts) is now inside the meta method.
expiredAllocations := s.meta.ExpireAllocationsByTimestamp(id, ts)
// 2. Recycle expired objects (SegmentManager's explicit responsibility)
if len(expiredAllocations) > 0 {
// IMPORTANT: Recycling allocations back to a pool/cache is crucial for memory efficiency.
for _, alloc := range expiredAllocations {
putAllocation(alloc)
}
}
s.meta.SetAllocations(segment.GetID(), allocations)
return true
return true // Continue to the next segment in the Range loop
})
}
@ -662,70 +688,104 @@ func (s *SegmentManager) tryToSealSegment(ctx context.Context, ts Timestamp, cha
// DropSegmentsOfChannel drops all segments in a channel
func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) {
_, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Drop-Segment")
defer sp.End()
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
// Clear sealed segments (no allocations to clean up here)
s.channel2Sealed.Remove(channel)
growing, ok := s.channel2Growing.Get(channel)
if !ok {
return
}
growing.Range(func(sid int64) bool {
// We don't strictly need SegmentInfo for cleanup, but we keep the read logic
// to check if the segment exists and to log warnings if the in-memory map is stale.
segment := s.meta.GetHealthySegment(ctx, sid)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", sid))
log.Warn("failed to get segment, remove it from growing map", zap.String("channel", channel), zap.Int64("segmentID", sid))
growing.Remove(sid)
return true
// We still continue to try cleaning up allocations in the meta, in case the segment was
// removed from SegmentsInfo but not from allocations map.
// Fallthrough to cleanup logic.
}
s.meta.SetAllocations(sid, nil)
for _, allocation := range segment.allocations {
// MODIFIED: Use the new meta method to safely remove allocations under allocationMu.
// This removes the map entry and returns the list of *Allocation objects for recycling.
allocations := s.meta.RemoveSegmentAllocations(sid)
// Recycle the returned allocation objects
for _, allocation := range allocations {
putAllocation(allocation)
}
return true
})
s.channel2Growing.Remove(channel)
}
// DropSegmentsOfPartition drops all segments in a channel
func (s *SegmentManager) DropSegmentsOfPartition(ctx context.Context, channel string, partitionIDs []int64) {
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
// Handle growing segments
if growing, ok := s.channel2Growing.Get(channel); ok {
for sid := range growing {
segment := s.meta.GetHealthySegment(ctx, sid)
isToBeDropped := false
if segment == nil {
log.Warn("failed to get segment, remove it",
zap.String("channel", channel),
zap.Int64("segmentID", sid))
growing.Remove(sid)
continue
isToBeDropped = true // SegmentInfo is gone, remove it from in-memory map
} else if contains(partitionIDs, segment.GetPartitionID()) {
isToBeDropped = true // Segment belongs to one of the partitions
}
if contains(partitionIDs, segment.GetPartitionID()) {
if isToBeDropped {
growing.Remove(sid)
// NEW ALLOCATION CLEANUP LOGIC
allocations := s.meta.RemoveSegmentAllocations(sid)
for _, allocation := range allocations {
putAllocation(allocation)
}
}
s.meta.SetAllocations(sid, nil)
for _, allocation := range segment.allocations {
putAllocation(allocation)
}
// If the segment was not dropped (and segment was not nil), we continue without cleaning up allocations
}
}
// Handle sealed segments
if sealed, ok := s.channel2Sealed.Get(channel); ok {
for sid := range sealed {
segment := s.meta.GetHealthySegment(ctx, sid)
isToBeDropped := false
if segment == nil {
log.Warn("failed to get segment, remove it",
zap.String("channel", channel),
zap.Int64("segmentID", sid))
sealed.Remove(sid)
continue
} else if contains(partitionIDs, segment.GetPartitionID()) {
isToBeDropped = true
}
if contains(partitionIDs, segment.GetPartitionID()) {
if isToBeDropped {
sealed.Remove(sid)
}
s.meta.SetAllocations(sid, nil)
for _, allocation := range segment.allocations {
putAllocation(allocation)
// NEW ALLOCATION CLEANUP LOGIC
allocations := s.meta.RemoveSegmentAllocations(sid)
for _, allocation := range allocations {
putAllocation(allocation)
}
}
}
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"testing"
"time"
@ -46,6 +47,90 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
// MockAllocationRecycler is a helper struct to verify putAllocation calls.
type MockAllocationRecycler struct {
RecycleCount atomic.Int64
}
func newMockAllocationRecycler() *MockAllocationRecycler {
return &MockAllocationRecycler{}
}
func (m *MockAllocationRecycler) Put(a *Allocation) {
m.RecycleCount.Add(1)
}
func TestSegmentManager_ExpireAllocations(t *testing.T) {
paramtable.Init()
// Assume newMockAllocator uses a strictly increasing counter for TSO
mockAllocator := newMockAllocator(t)
meta, err := newMemoryMeta(t)
assert.NoError(t, err)
schema := newTestSchema()
collID, err := mockAllocator.AllocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
// Setup: Use a mock to track recycling
mockRecycler := newMockAllocationRecycler()
originalPutAllocation := putAllocation
putAllocation = mockRecycler.Put
defer func() { putAllocation = originalPutAllocation }()
segmentManager, _ := newSegmentManager(meta, mockAllocator)
// 1. Alloc 1 segment. This creates alloc0, which has the smallest ExpireTime (TSO_A).
allocs, err := segmentManager.AllocSegment(context.Background(), collID, 100, "c1", 100)
assert.NoError(t, err)
sid := allocs[0].SegmentID
// NOTE: allocs[0] (alloc0) is now implicitly referenced only by its segment ID (sid).
// 2. Manually add 3 allocations (TSO_B, TSO_C, TSO_D). All ExpireTimes are generated dynamically.
// TSO_B (Will be expired)
alloc1 := getAllocation(10)
alloc1.SegmentID = sid
alloc1.ExpireTime, err = segmentManager.genExpireTs(context.Background())
assert.NoError(t, err)
meta.AddAllocation(sid, alloc1)
// TSO_C (Expiration Threshold) - Will be expired because check is <=
alloc2 := getAllocation(20)
alloc2.SegmentID = sid
alloc2.ExpireTime, err = segmentManager.genExpireTs(context.Background())
assert.NoError(t, err)
meta.AddAllocation(sid, alloc2)
// Set expiration line: TSO_C is the threshold.
// Allocations to be recycled: allocs[0], alloc1, alloc2 (Total 3)
tsToExpire := alloc2.ExpireTime
// TSO_D (Will be retained) - This is the largest ExpireTime.
alloc3 := getAllocation(30)
alloc3.SegmentID = sid
alloc3.ExpireTime, err = segmentManager.genExpireTs(context.Background())
assert.NoError(t, err)
meta.AddAllocation(sid, alloc3)
// Action: Expire allocations older than or equal to tsToExpire (TSO_C)
segmentManager.ExpireAllocations(context.Background(), "c1", tsToExpire)
// Assertions
// 1. FIX: Recycle Count must be 3 (allocs[0], alloc1, alloc2).
assert.Equal(t, int64(3), mockRecycler.RecycleCount.Load(), "Three expired allocation objects should have been recycled (alloc0, alloc1, alloc2)")
// 2. FIX: Check final list of allocations. Expected 1 (alloc3 only).
finalAllocs := meta.GetAllocations(sid)
assert.Len(t, finalAllocs, 1, "Final allocations list should only contain non-expired items: alloc3")
assert.ElementsMatch(t, []*Allocation{alloc3}, finalAllocs)
// 3. Check LastExpireTime update (alloc3 is the highest remaining ExpireTime)
segment := meta.GetHealthySegment(context.Background(), sid)
assert.Equal(t, alloc3.ExpireTime, segment.LastExpireTime, "LastExpireTime should be updated to max remaining ExpireTime (alloc3)")
}
func TestManagerOptions(t *testing.T) {
// ctx := context.Background()
paramtable.Init()
@ -475,8 +560,13 @@ func TestExpireAllocation(t *testing.T) {
mockPolicy := func(schema *schemapb.CollectionSchema) (int, error) {
return 10000000, nil
}
// NOTE: putAllocation needs to be configured correctly here for recycling,
// otherwise there might be memory leaks or warnings.
// Assuming putAllocation = getPoolAllocation, or using a MockAllocationRecycler
// if tracking is necessary.
segmentManager, _ := newSegmentManager(meta, mockAllocator, withCalUpperLimitPolicy(mockPolicy))
// alloc 100 times and expire
// Allocate 100 times and calculate max expire timestamp
var maxts Timestamp
var id int64 = -1
for i := 0; i < 100; i++ {
@ -493,13 +583,21 @@ func TestExpireAllocation(t *testing.T) {
}
}
// Fix 1: Check initial allocation count (using meta.GetAllocations)
initialAllocs := meta.GetAllocations(id)
assert.Len(t, initialAllocs, 100)
// Check if LastExpireTime is correctly set to the maximum value
segment := meta.GetHealthySegment(context.TODO(), id)
assert.NotNil(t, segment)
assert.EqualValues(t, 100, len(segment.allocations))
assert.Equal(t, maxts, segment.LastExpireTime, "LastExpireTime should match max ExpireTime before expiration")
// Execute expiration
segmentManager.ExpireAllocations(context.TODO(), "ch1", maxts)
segment = meta.GetHealthySegment(context.TODO(), id)
assert.NotNil(t, segment)
assert.EqualValues(t, 0, len(segment.allocations))
// Fix 2: Check allocation count after expiration (using meta.GetAllocations)
finalAllocs := meta.GetAllocations(id)
assert.Len(t, finalAllocs, 0)
}
func TestGetFlushableSegments(t *testing.T) {

View File

@ -20,6 +20,7 @@ import (
"context"
"sync"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"