enhance: [10kcp] Optimize segmentManager segments (#37884)

1. Use vchannel and partition indices for segments.
2. Replace coarse-grained mutex with concurrent map.

issue: https://github.com/milvus-io/milvus/issues/37633,
https://github.com/milvus-io/milvus/issues/37630

pr: https://github.com/milvus-io/milvus/pull/37836

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-11-21 15:10:04 +08:00 committed by GitHub
parent 92ab65ada0
commit 9e1ba0759c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 329 additions and 278 deletions

View File

@ -79,9 +79,9 @@ func (_c *MockManager_AllocSegment_Call) RunAndReturn(run func(context.Context,
return _c
}
// DropSegment provides a mock function with given fields: ctx, segmentID
func (_m *MockManager) DropSegment(ctx context.Context, segmentID int64) {
_m.Called(ctx, segmentID)
// DropSegment provides a mock function with given fields: ctx, channel, partitionID, segmentID
func (_m *MockManager) DropSegment(ctx context.Context, channel string, partitionID int64, segmentID int64) {
_m.Called(ctx, channel, partitionID, segmentID)
}
// MockManager_DropSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegment'
@ -90,15 +90,17 @@ type MockManager_DropSegment_Call struct {
}
// DropSegment is a helper method to define mock.On call
// - ctx context.Context
// - segmentID int64
func (_e *MockManager_Expecter) DropSegment(ctx interface{}, segmentID interface{}) *MockManager_DropSegment_Call {
return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, segmentID)}
// - ctx context.Context
// - channel string
// - partitionID int64
// - segmentID int64
func (_e *MockManager_Expecter) DropSegment(ctx interface{}, channel interface{}, partitionID interface{}, segmentID interface{}) *MockManager_DropSegment_Call {
return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, channel, partitionID, segmentID)}
}
func (_c *MockManager_DropSegment_Call) Run(run func(ctx context.Context, segmentID int64)) *MockManager_DropSegment_Call {
func (_c *MockManager_DropSegment_Call) Run(run func(ctx context.Context, channel string, partitionID int64, segmentID int64)) *MockManager_DropSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64))
run(args[0].(context.Context), args[1].(string), args[2].(int64), args[3].(int64))
})
return _c
}
@ -108,7 +110,7 @@ func (_c *MockManager_DropSegment_Call) Return() *MockManager_DropSegment_Call {
return _c
}
func (_c *MockManager_DropSegment_Call) RunAndReturn(run func(context.Context, int64)) *MockManager_DropSegment_Call {
func (_c *MockManager_DropSegment_Call) RunAndReturn(run func(context.Context, string, int64, int64)) *MockManager_DropSegment_Call {
_c.Call.Return(run)
return _c
}
@ -148,17 +150,8 @@ func (_c *MockManager_DropSegmentsOfChannel_Call) RunAndReturn(run func(context.
}
// ExpireAllocations provides a mock function with given fields: channel, ts
func (_m *MockManager) ExpireAllocations(channel string, ts uint64) error {
ret := _m.Called(channel, ts)
var r0 error
if rf, ok := ret.Get(0).(func(string, uint64) error); ok {
r0 = rf(channel, ts)
} else {
r0 = ret.Error(0)
}
return r0
func (_m *MockManager) ExpireAllocations(channel string, ts uint64) {
_m.Called(channel, ts)
}
// MockManager_ExpireAllocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExpireAllocations'
@ -180,12 +173,12 @@ func (_c *MockManager_ExpireAllocations_Call) Run(run func(channel string, ts ui
return _c
}
func (_c *MockManager_ExpireAllocations_Call) Return(_a0 error) *MockManager_ExpireAllocations_Call {
_c.Call.Return(_a0)
func (_c *MockManager_ExpireAllocations_Call) Return() *MockManager_ExpireAllocations_Call {
_c.Call.Return()
return _c
}
func (_c *MockManager_ExpireAllocations_Call) RunAndReturn(run func(string, uint64) error) *MockManager_ExpireAllocations_Call {
func (_c *MockManager_ExpireAllocations_Call) RunAndReturn(run func(string, uint64)) *MockManager_ExpireAllocations_Call {
_c.Call.Return(run)
return _c
}
@ -246,25 +239,25 @@ func (_c *MockManager_GetFlushableSegments_Call) RunAndReturn(run func(context.C
return _c
}
// SealAllSegments provides a mock function with given fields: ctx, collectionID, segIDs
func (_m *MockManager) SealAllSegments(ctx context.Context, collectionID int64, segIDs []int64) ([]int64, error) {
ret := _m.Called(ctx, collectionID, segIDs)
// SealAllSegments provides a mock function with given fields: ctx, channels, segIDs
func (_m *MockManager) SealAllSegments(ctx context.Context, channels []string, segIDs []int64) ([]int64, error) {
ret := _m.Called(ctx, channels, segIDs)
var r0 []int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, []int64) ([]int64, error)); ok {
return rf(ctx, collectionID, segIDs)
if rf, ok := ret.Get(0).(func(context.Context, []string, []int64) ([]int64, error)); ok {
return rf(ctx, channels, segIDs)
}
if rf, ok := ret.Get(0).(func(context.Context, int64, []int64) []int64); ok {
r0 = rf(ctx, collectionID, segIDs)
if rf, ok := ret.Get(0).(func(context.Context, []string, []int64) []int64); ok {
r0 = rf(ctx, channels, segIDs)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int64, []int64) error); ok {
r1 = rf(ctx, collectionID, segIDs)
if rf, ok := ret.Get(1).(func(context.Context, []string, []int64) error); ok {
r1 = rf(ctx, channels, segIDs)
} else {
r1 = ret.Error(1)
}
@ -278,16 +271,16 @@ type MockManager_SealAllSegments_Call struct {
}
// SealAllSegments is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - segIDs []int64
func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, collectionID interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call {
return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, collectionID, segIDs)}
// - ctx context.Context
// - channels []string
// - segIDs []int64
func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, channels interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call {
return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, channels, segIDs)}
}
func (_c *MockManager_SealAllSegments_Call) Run(run func(ctx context.Context, collectionID int64, segIDs []int64)) *MockManager_SealAllSegments_Call {
func (_c *MockManager_SealAllSegments_Call) Run(run func(ctx context.Context, channels []string, segIDs []int64)) *MockManager_SealAllSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].([]int64))
run(args[0].(context.Context), args[1].([]string), args[2].([]int64))
})
return _c
}
@ -297,7 +290,7 @@ func (_c *MockManager_SealAllSegments_Call) Return(_a0 []int64, _a1 error) *Mock
return _c
}
func (_c *MockManager_SealAllSegments_Call) RunAndReturn(run func(context.Context, int64, []int64) ([]int64, error)) *MockManager_SealAllSegments_Call {
func (_c *MockManager_SealAllSegments_Call) RunAndReturn(run func(context.Context, []string, []int64) ([]int64, error)) *MockManager_SealAllSegments_Call {
_c.Call.Return(run)
return _c
}

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -75,14 +74,14 @@ type Manager interface {
// AllocSegment allocates rows and record the allocation.
AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error)
// DropSegment drops the segment from manager.
DropSegment(ctx context.Context, segmentID UniqueID)
DropSegment(ctx context.Context, channel string, partitionID, segmentID UniqueID)
// SealAllSegments seals all segments of collection with collectionID and return sealed segments.
// If segIDs is not empty, also seals segments in segIDs.
SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error)
SealAllSegments(ctx context.Context, channels []string, segIDs []UniqueID) ([]UniqueID, error)
// GetFlushableSegments returns flushable segment ids
GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error)
// ExpireAllocations notifies segment status to expire old allocations
ExpireAllocations(channel string, ts Timestamp) error
ExpireAllocations(channel string, ts Timestamp)
// DropSegmentsOfChannel drops all segments in a channel
DropSegmentsOfChannel(ctx context.Context, channel string)
}
@ -102,13 +101,18 @@ func (alloc *Allocation) String() string {
// make sure SegmentManager implements Manager
var _ Manager = (*SegmentManager)(nil)
type (
Channel2Segments = *typeutil.ConcurrentMap[string, Partition2Segments]
Partition2Segments = *typeutil.ConcurrentMap[int64, Segments]
Segments = *typeutil.ConcurrentMap[int64, struct{}]
)
// SegmentManager handles L1 segment related logic
type SegmentManager struct {
meta *meta
mu lock.RWMutex
allocator allocator
helper allocHelper
segments []UniqueID
channel2Segments Channel2Segments
estimatePolicy calUpperLimitPolicy
allocPolicy AllocatePolicy
segmentSealPolicies []SegmentSealPolicy
@ -209,7 +213,7 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) (*S
meta: meta,
allocator: allocator,
helper: defaultAllocHelper(),
segments: make([]UniqueID, 0),
channel2Segments: typeutil.NewConcurrentMap[string, Partition2Segments](),
estimatePolicy: defaultCalUpperLimitPolicy(),
allocPolicy: defaultAllocatePolicy(),
segmentSealPolicies: defaultSegmentSealPolicy(),
@ -219,49 +223,62 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) (*S
for _, opt := range opts {
opt.apply(manager)
}
manager.loadSegmentsFromMeta()
if err := manager.maybeResetLastExpireForSegments(); err != nil {
latestTs, err := manager.genLastExpireTsForSegments()
if err != nil {
return nil, err
}
manager.loadSegmentsFromMeta(latestTs)
return manager, nil
}
// loadSegmentsFromMeta generate corresponding segment status for each segment from meta
func (s *SegmentManager) loadSegmentsFromMeta() {
segments := s.meta.GetUnFlushedSegments()
segmentsID := make([]UniqueID, 0, len(segments))
for _, segment := range segments {
if segment.Level != datapb.SegmentLevel_L0 {
segmentsID = append(segmentsID, segment.GetID())
func (s *SegmentManager) loadSegmentsFromMeta(latestTs Timestamp) {
unflushed := s.meta.GetUnFlushedSegments()
unflushed = lo.Filter(unflushed, func(segment *SegmentInfo, _ int) bool {
return segment.Level != datapb.SegmentLevel_L0
})
channel2Segments := lo.GroupBy(unflushed, func(segment *SegmentInfo) string {
return segment.GetInsertChannel()
})
for channel, segments := range channel2Segments {
partition2Segments, _ := s.channel2Segments.GetOrInsert(channel, typeutil.NewConcurrentMap[int64, Segments]())
partition2SegmentsMap := lo.GroupBy(segments, func(segment *SegmentInfo) int64 {
return segment.GetPartitionID()
})
for partitionID, segmentInfos := range partition2SegmentsMap {
segments := typeutil.NewConcurrentMap[int64, struct{}]()
for _, segment := range segmentInfos {
s.maybeResetLastExpireForSegment(segment, latestTs)
segments.Insert(segment.GetID(), struct{}{})
}
partition2Segments.Insert(partitionID, segments)
}
}
s.segments = segmentsID
}
func (s *SegmentManager) maybeResetLastExpireForSegments() error {
// for all sealed and growing segments, need to reset last expire
if len(s.segments) > 0 {
var latestTs uint64
allocateErr := retry.Do(context.Background(), func() error {
ts, tryErr := s.genExpireTs(context.Background())
func (s *SegmentManager) genLastExpireTsForSegments() (Timestamp, error) {
var latestTs uint64
allocateErr := retry.Do(context.Background(), func() error {
ts, tryErr := s.genExpireTs(context.Background())
if tryErr != nil {
log.Warn("failed to get ts from rootCoord for globalLastExpire", zap.Error(tryErr))
if tryErr != nil {
return tryErr
}
latestTs = ts
return nil
}, retry.Attempts(Params.DataCoordCfg.AllocLatestExpireAttempt.GetAsUint()), retry.Sleep(200*time.Millisecond))
if allocateErr != nil {
log.Warn("cannot allocate latest lastExpire from rootCoord", zap.Error(allocateErr))
return errors.New("global max expire ts is unavailable for segment manager")
}
for _, sID := range s.segments {
if segment := s.meta.GetSegment(sID); segment != nil && segment.GetState() == commonpb.SegmentState_Growing {
s.meta.SetLastExpire(sID, latestTs)
}
return tryErr
}
latestTs = ts
return nil
}, retry.Attempts(Params.DataCoordCfg.AllocLatestExpireAttempt.GetAsUint()), retry.Sleep(200*time.Millisecond))
if allocateErr != nil {
log.Warn("cannot allocate latest lastExpire from rootCoord", zap.Error(allocateErr))
return 0, errors.New("global max expire ts is unavailable for segment manager")
}
return latestTs, nil
}
func (s *SegmentManager) maybeResetLastExpireForSegment(segment *SegmentInfo, latestTs Timestamp) {
// for all sealed and growing segments, need to reset last expire
if segment != nil && segment.GetState() == commonpb.SegmentState_Growing {
s.meta.SetLastExpire(segment.GetID(), latestTs)
}
return nil
}
// AllocSegment allocate segment per request collcation, partication, channel and rows
@ -275,38 +292,32 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
With(zap.Int64("requestRows", requestRows))
_, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Alloc-Segment")
defer sp.End()
s.mu.Lock()
defer s.mu.Unlock()
// filter segments
validSegments := make(map[UniqueID]struct{})
invalidSegments := make(map[UniqueID]struct{})
segments := make([]*SegmentInfo, 0)
for _, segmentID := range s.segments {
segmentInfos := make([]*SegmentInfo, 0)
partition2Segments, _ := s.channel2Segments.GetOrInsert(channelName, typeutil.NewConcurrentMap[int64, Segments]())
segments, _ := partition2Segments.GetOrInsert(partitionID, typeutil.NewConcurrentMap[int64, struct{}]())
segments.Range(func(segmentID int64, _ struct{}) bool {
segment := s.meta.GetHealthySegment(segmentID)
if segment == nil {
invalidSegments[segmentID] = struct{}{}
continue
log.Warn("failed to get segment, remove it", zap.String("channel", channelName), zap.Int64("segmentID", segmentID))
segments.Remove(segmentID)
return true
}
validSegments[segmentID] = struct{}{}
if !satisfy(segment, collectionID, partitionID, channelName) || !isGrowing(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 {
continue
if !isGrowing(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 {
return true
}
segments = append(segments, segment)
}
if len(invalidSegments) > 0 {
log.Warn("Failed to get segments infos from meta, clear them", zap.Int64s("segmentIDs", lo.Keys(invalidSegments)))
}
s.segments = lo.Keys(validSegments)
segmentInfos = append(segmentInfos, segment)
return true
})
// Apply allocation policy.
maxCountPerSegment, err := s.estimateMaxNumOfRows(collectionID)
if err != nil {
return nil, err
}
newSegmentAllocations, existedSegmentAllocations := s.allocPolicy(segments,
newSegmentAllocations, existedSegmentAllocations := s.allocPolicy(segmentInfos,
requestRows, int64(maxCountPerSegment), datapb.SegmentLevel_L1)
// create new segments and add allocations
@ -339,11 +350,6 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
return allocations, nil
}
func satisfy(segment *SegmentInfo, collectionID, partitionID UniqueID, channel string) bool {
return segment.GetCollectionID() == collectionID && segment.GetPartitionID() == partitionID &&
segment.GetInsertChannel() == channel
}
func isGrowing(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Growing
}
@ -392,7 +398,9 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
log.Error("failed to add segment to DataCoord", zap.Error(err))
return nil, err
}
s.segments = append(s.segments, id)
partition2Segments, _ := s.channel2Segments.GetOrInsert(channelName, typeutil.NewConcurrentMap[int64, Segments]())
segments, _ := partition2Segments.GetOrInsert(partitionID, typeutil.NewConcurrentMap[int64, struct{}]())
segments.Insert(id, struct{}{})
log.Info("datacoord: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
@ -412,17 +420,16 @@ func (s *SegmentManager) estimateMaxNumOfRows(collectionID UniqueID) (int, error
}
// DropSegment drop the segment from manager.
func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
func (s *SegmentManager) DropSegment(ctx context.Context, channel string, partitionID, segmentID UniqueID) {
_, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Drop-Segment")
defer sp.End()
s.mu.Lock()
defer s.mu.Unlock()
for i, id := range s.segments {
if id == segmentID {
s.segments = append(s.segments[:i], s.segments[i+1:]...)
break
if partition2Segments, ok := s.channel2Segments.Get(channel); ok {
if segments, ok := partition2Segments.Get(partitionID); ok {
segments.Remove(segmentID)
}
}
segment := s.meta.GetHealthySegment(segmentID)
if segment == nil {
log.Warn("Failed to get segment", zap.Int64("id", segmentID))
@ -435,23 +442,32 @@ func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
}
// SealAllSegments seals all segments of collection with collectionID and return sealed segments
func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error) {
func (s *SegmentManager) SealAllSegments(ctx context.Context, channels []string, segIDs []UniqueID) ([]UniqueID, error) {
_, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Seal-Segments")
defer sp.End()
s.mu.Lock()
defer s.mu.Unlock()
var ret []UniqueID
segCandidates := s.segments
segCandidates := make([]int64, 0)
for _, channel := range channels {
partition2Segments, ok := s.channel2Segments.Get(channel)
if !ok {
continue
}
partition2Segments.Range(func(partitionID int64, segments Segments) bool {
segCandidates = append(segCandidates, segments.Keys()...)
return true
})
}
if len(segIDs) != 0 {
segCandidates = segIDs
}
sealedSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed
return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed
})
growingSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing
return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing
})
ret = append(ret, sealedSegments...)
@ -468,70 +484,95 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel string, t Timestamp) ([]UniqueID, error) {
_, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Get-Segments")
defer sp.End()
s.mu.Lock()
defer s.mu.Unlock()
// TODO:move tryToSealSegment and dropEmptySealedSegment outside
if err := s.tryToSealSegment(t, channel); err != nil {
return nil, err
}
// TODO: It's too frequent; perhaps each channel could check once per minute instead.
s.cleanupSealedSegment(t, channel)
ret := make([]UniqueID, 0, len(s.segments))
for _, id := range s.segments {
info := s.meta.GetHealthySegment(id)
if info == nil || info.InsertChannel != channel {
continue
}
if s.flushPolicy(info, t) {
ret = append(ret, id)
}
partition2Segments, ok := s.channel2Segments.Get(channel)
if !ok {
return nil, nil
}
ret := make([]UniqueID, 0, partition2Segments.Len())
partition2Segments.Range(func(partitionID int64, segments Segments) bool {
segments.Range(func(segmentID int64, _ struct{}) bool {
info := s.meta.GetHealthySegment(segmentID)
if info == nil {
return true
}
if s.flushPolicy(info, t) {
ret = append(ret, segmentID)
}
return true
})
return true
})
return ret, nil
}
// ExpireAllocations notify segment status to expire old allocations
func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
s.mu.Lock()
defer s.mu.Unlock()
for _, id := range s.segments {
segment := s.meta.GetHealthySegment(id)
if segment == nil || segment.InsertChannel != channel {
continue
}
allocations := make([]*Allocation, 0, len(segment.allocations))
for i := 0; i < len(segment.allocations); i++ {
if segment.allocations[i].ExpireTime <= ts {
a := segment.allocations[i]
putAllocation(a)
} else {
allocations = append(allocations, segment.allocations[i])
}
}
s.meta.SetAllocations(segment.GetID(), allocations)
func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) {
partition2Segments, ok := s.channel2Segments.Get(channel)
if !ok {
return
}
return nil
partition2Segments.Range(func(partitionID int64, segments Segments) bool {
segments.Range(func(id int64, _ struct{}) bool {
segment := s.meta.GetHealthySegment(id)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id))
segments.Remove(id)
return true
}
allocations := make([]*Allocation, 0, len(segment.allocations))
for i := 0; i < len(segment.allocations); i++ {
if segment.allocations[i].ExpireTime <= ts {
a := segment.allocations[i]
putAllocation(a)
} else {
allocations = append(allocations, segment.allocations[i])
}
}
s.meta.SetAllocations(segment.GetID(), allocations)
return true
})
return true
})
}
func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) {
valids := make([]int64, 0, len(s.segments))
for _, id := range s.segments {
segment := s.meta.GetHealthySegment(id)
if segment == nil || segment.InsertChannel != channel {
valids = append(valids, id)
continue
}
if isEmptySealedSegment(segment, ts) {
log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id))
s.meta.SetState(id, commonpb.SegmentState_Dropped)
continue
}
valids = append(valids, id)
partition2Segments, ok := s.channel2Segments.Get(channel)
if !ok {
return
}
s.segments = valids
partition2Segments.Range(func(partitionID int64, segments Segments) bool {
segments.Range(func(id int64, _ struct{}) bool {
segment := s.meta.GetHealthySegment(id)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id))
segments.Remove(id)
return true
}
if isEmptySealedSegment(segment, ts) {
log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id))
if err := s.meta.SetState(id, commonpb.SegmentState_Dropped); err != nil {
log.Warn("failed to set segment state to dropped", zap.String("channel", channel),
zap.Int64("segmentID", id), zap.Error(err))
} else {
segments.Remove(id)
}
}
return true
})
return true
})
}
func isEmptySealedSegment(segment *SegmentInfo, ts Timestamp) bool {
@ -540,29 +581,46 @@ func isEmptySealedSegment(segment *SegmentInfo, ts Timestamp) bool {
// tryToSealSegment applies segment & channel seal policies
func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
partition2Segments, ok := s.channel2Segments.Get(channel)
if !ok {
return nil
}
channelInfo := make(map[string][]*SegmentInfo)
sealedSegments := make(map[int64]struct{})
for _, id := range s.segments {
info := s.meta.GetHealthySegment(id)
if info == nil || info.InsertChannel != channel {
continue
}
channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info)
if info.State != commonpb.SegmentState_Growing {
continue
}
// change shouldSeal to segment seal policy logic
for _, policy := range s.segmentSealPolicies {
if shouldSeal, reason := policy.ShouldSeal(info, ts); shouldSeal {
log.Info("Seal Segment for policy matched", zap.Int64("segmentID", info.GetID()), zap.String("reason", reason))
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
return err
}
sealedSegments[id] = struct{}{}
break
var setStateErr error
partition2Segments.Range(func(partitionID int64, segments Segments) bool {
segments.Range(func(id int64, _ struct{}) bool {
info := s.meta.GetHealthySegment(id)
if info == nil {
return true
}
}
channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info)
if info.State != commonpb.SegmentState_Growing {
return true
}
// change shouldSeal to segment seal policy logic
for _, policy := range s.segmentSealPolicies {
if shouldSeal, reason := policy.ShouldSeal(info, ts); shouldSeal {
log.Info("Seal Segment for policy matched", zap.Int64("segmentID", info.GetID()), zap.String("reason", reason))
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
setStateErr = err
return false
}
sealedSegments[id] = struct{}{}
break
}
}
return true
})
return setStateErr == nil
})
if setStateErr != nil {
return setStateErr
}
for channel, segmentInfos := range channelInfo {
for _, policy := range s.channelSealPolicies {
vs, reason := policy(channel, segmentInfos, ts)
@ -587,24 +645,25 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
// DropSegmentsOfChannel drops all segments in a channel
func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) {
s.mu.Lock()
defer s.mu.Unlock()
validSegments := make([]int64, 0, len(s.segments))
for _, sid := range s.segments {
segment := s.meta.GetHealthySegment(sid)
if segment == nil {
continue
}
if segment.GetInsertChannel() != channel {
validSegments = append(validSegments, sid)
continue
}
s.meta.SetAllocations(sid, nil)
for _, allocation := range segment.allocations {
putAllocation(allocation)
}
partition2Segments, ok := s.channel2Segments.Get(channel)
if !ok {
return
}
s.segments = validSegments
partition2Segments.Range(func(partitionID int64, segments Segments) bool {
segments.Range(func(sid int64, _ struct{}) bool {
segment := s.meta.GetHealthySegment(sid)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", sid))
segments.Remove(sid)
return true
}
s.meta.SetAllocations(sid, nil)
for _, allocation := range segment.allocations {
putAllocation(allocation)
}
return true
})
return true
})
s.channel2Segments.Remove(channel)
}

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestManagerOptions(t *testing.T) {
@ -97,6 +98,14 @@ func TestManagerOptions(t *testing.T) {
})
}
func GetSegmentsByChannelAndPartition(t *testing.T, segmentManager *SegmentManager, channel string, partitionID int64) *typeutil.ConcurrentMap[int64, struct{}] {
partition2Segments, ok := segmentManager.channel2Segments.Get(channel)
assert.True(t, ok)
segments, ok := partition2Segments.Get(partitionID)
assert.True(t, ok)
return segments
}
func TestAllocSegment(t *testing.T) {
ctx := context.Background()
paramtable.Init()
@ -142,19 +151,23 @@ func TestAllocSegment(t *testing.T) {
})
t.Run("alloc clear unhealthy segment", func(t *testing.T) {
allocations1, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
vchannel := "c1"
partitionID := int64(100)
allocations1, err := segmentManager.AllocSegment(ctx, collID, partitionID, vchannel, 100)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations1))
assert.EqualValues(t, 1, len(segmentManager.segments))
segments := GetSegmentsByChannelAndPartition(t, segmentManager, vchannel, partitionID)
assert.EqualValues(t, 1, segments.Len())
err = meta.SetState(allocations1[0].SegmentID, commonpb.SegmentState_Dropped)
assert.NoError(t, err)
allocations2, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
allocations2, err := segmentManager.AllocSegment(ctx, collID, partitionID, vchannel, 100)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations2))
// clear old healthy and alloc new
assert.EqualValues(t, 1, len(segmentManager.segments))
segments = GetSegmentsByChannelAndPartition(t, segmentManager, vchannel, partitionID)
assert.EqualValues(t, 1, segments.Len())
assert.NotEqual(t, allocations1[0].SegmentID, allocations2[0].SegmentID)
})
}
@ -217,7 +230,8 @@ func TestLastExpireReset(t *testing.T) {
meta.SetCurrentRows(segmentID1, bigRows)
meta.SetCurrentRows(segmentID2, bigRows)
meta.SetCurrentRows(segmentID3, smallRows)
segmentManager.tryToSealSegment(expire1, channelName)
err = segmentManager.tryToSealSegment(expire1, channelName)
assert.NoError(t, err)
assert.Equal(t, commonpb.SegmentState_Sealed, meta.GetSegment(segmentID1).GetState())
assert.Equal(t, commonpb.SegmentState_Sealed, meta.GetSegment(segmentID2).GetState())
assert.Equal(t, commonpb.SegmentState_Growing, meta.GetSegment(segmentID3).GetState())
@ -270,11 +284,14 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
vchannel := "ch0"
partitionID := int64(100)
sealedSegment := &datapb.SegmentInfo{
ID: 1,
CollectionID: collID,
PartitionID: 0,
InsertChannel: "",
PartitionID: partitionID,
InsertChannel: vchannel,
State: commonpb.SegmentState_Sealed,
MaxRowNum: 100,
LastExpireTime: 1000,
@ -282,8 +299,8 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
growingSegment := &datapb.SegmentInfo{
ID: 2,
CollectionID: collID,
PartitionID: 0,
InsertChannel: "",
PartitionID: partitionID,
InsertChannel: vchannel,
State: commonpb.SegmentState_Growing,
MaxRowNum: 100,
LastExpireTime: 1000,
@ -291,8 +308,8 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
flushedSegment := &datapb.SegmentInfo{
ID: 3,
CollectionID: collID,
PartitionID: 0,
InsertChannel: "",
PartitionID: partitionID,
InsertChannel: vchannel,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 100,
LastExpireTime: 1000,
@ -304,9 +321,10 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
err = meta.AddSegment(context.TODO(), NewSegmentInfo(flushedSegment))
assert.NoError(t, err)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
segments := segmentManager.segments
assert.EqualValues(t, 2, len(segments))
segmentManager, err := newSegmentManager(meta, mockAllocator)
assert.NoError(t, err)
segments := GetSegmentsByChannelAndPartition(t, segmentManager, vchannel, partitionID)
assert.EqualValues(t, 2, segments.Len())
}
func TestSaveSegmentsToMeta(t *testing.T) {
@ -323,7 +341,7 @@ func TestSaveSegmentsToMeta(t *testing.T) {
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
_, err = segmentManager.SealAllSegments(context.Background(), collID, nil)
_, err = segmentManager.SealAllSegments(context.Background(), []string{"c1"}, nil)
assert.NoError(t, err)
segment := meta.GetHealthySegment(allocations[0].SegmentID)
assert.NotNil(t, segment)
@ -345,7 +363,7 @@ func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) {
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
_, err = segmentManager.SealAllSegments(context.Background(), collID, []int64{allocations[0].SegmentID})
_, err = segmentManager.SealAllSegments(context.Background(), []string{"c1"}, []int64{allocations[0].SegmentID})
assert.NoError(t, err)
segment := meta.GetHealthySegment(allocations[0].SegmentID)
assert.NotNil(t, segment)
@ -364,14 +382,14 @@ func TestDropSegment(t *testing.T) {
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager, _ := newSegmentManager(meta, mockAllocator)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 100, "c1", 1000)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
segID := allocations[0].SegmentID
segment := meta.GetHealthySegment(segID)
assert.NotNil(t, segment)
segmentManager.DropSegment(context.Background(), segID)
segmentManager.DropSegment(context.Background(), "c1", 100, segID)
segment = meta.GetHealthySegment(segID)
assert.NotNil(t, segment)
}
@ -433,8 +451,7 @@ func TestExpireAllocation(t *testing.T) {
segment := meta.GetHealthySegment(id)
assert.NotNil(t, segment)
assert.EqualValues(t, 100, len(segment.allocations))
err = segmentManager.ExpireAllocations("ch1", maxts)
assert.NoError(t, err)
segmentManager.ExpireAllocations("ch1", maxts)
segment = meta.GetHealthySegment(id)
assert.NotNil(t, segment)
assert.EqualValues(t, 0, len(segment.allocations))
@ -456,7 +473,7 @@ func TestGetFlushableSegments(t *testing.T) {
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
ids, err := segmentManager.SealAllSegments(context.TODO(), collID, nil)
ids, err := segmentManager.SealAllSegments(context.TODO(), []string{"c1"}, nil)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, allocations[0].SegmentID, ids[0])
@ -750,6 +767,7 @@ func TestAllocationPool(t *testing.T) {
}
func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
partitionID := int64(100)
type fields struct {
meta *meta
segments []UniqueID
@ -772,6 +790,7 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
PartitionID: partitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
},
@ -779,6 +798,7 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
PartitionID: partitionID,
InsertChannel: "ch2",
State: commonpb.SegmentState_Flushed,
},
@ -802,6 +822,7 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
PartitionID: partitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
},
@ -809,6 +830,7 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
PartitionID: partitionID,
InsertChannel: "ch2",
State: commonpb.SegmentState_Growing,
},
@ -827,11 +849,34 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &SegmentManager{
meta: tt.fields.meta,
segments: tt.fields.segments,
meta: tt.fields.meta,
channel2Segments: typeutil.NewConcurrentMap[string, Partition2Segments](),
}
for _, segmentID := range tt.fields.segments {
segmentInfo := tt.fields.meta.GetSegment(segmentID)
partID := partitionID
channel := tt.args.channel
if segmentInfo != nil {
partID = segmentInfo.GetPartitionID()
channel = segmentInfo.GetInsertChannel()
}
partition2Segments, _ := s.channel2Segments.GetOrInsert(channel, typeutil.NewConcurrentMap[int64, Segments]())
segments, _ := partition2Segments.GetOrInsert(partID, typeutil.NewConcurrentMap[int64, struct{}]())
segments.Insert(segmentID, struct{}{})
}
s.DropSegmentsOfChannel(context.TODO(), tt.args.channel)
assert.ElementsMatch(t, tt.want, s.segments)
all := make([]int64, 0)
s.channel2Segments.Range(func(_ string, partition2Segments Partition2Segments) bool {
partition2Segments.Range(func(_ int64, segments Segments) bool {
segments.Range(func(segmentID int64, _ struct{}) bool {
all = append(all, segmentID)
return true
})
return true
})
return true
})
assert.ElementsMatch(t, tt.want, all)
})
}
}

View File

@ -821,52 +821,10 @@ func TestServer_getSystemInfoMetrics(t *testing.T) {
}
}
type spySegmentManager struct {
spyCh chan struct{}
}
// AllocSegment allocates rows and record the allocation.
func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) {
panic("not implemented") // TODO: Implement
}
func (s *spySegmentManager) allocSegmentForImport(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64, taskID int64) (*Allocation, error) {
panic("not implemented") // TODO: Implement
}
// DropSegment drops the segment from manager.
func (s *spySegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
}
// FlushImportSegments set importing segment state to Flushed.
func (s *spySegmentManager) FlushImportSegments(ctx context.Context, collectionID UniqueID, segmentIDs []UniqueID) error {
panic("not implemented")
}
// SealAllSegments seals all segments of collection with collectionID and return sealed segments
func (s *spySegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error) {
panic("not implemented") // TODO: Implement
}
// GetFlushableSegments returns flushable segment ids
func (s *spySegmentManager) GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error) {
panic("not implemented") // TODO: Implement
}
// ExpireAllocations notifies segment status to expire old allocations
func (s *spySegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
panic("not implemented") // TODO: Implement
}
// DropSegmentsOfChannel drops all segments in a channel
func (s *spySegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) {
s.spyCh <- struct{}{}
}
func TestDropVirtualChannel(t *testing.T) {
t.Run("normal DropVirtualChannel", func(t *testing.T) {
spyCh := make(chan struct{}, 1)
svr := newTestServer(t, WithSegmentManager(&spySegmentManager{spyCh: spyCh}))
segmentManager := NewMockManager(t)
svr := newTestServer(t, WithSegmentManager(segmentManager))
defer closeTestServer(t, svr)
@ -993,12 +951,11 @@ func TestDropVirtualChannel(t *testing.T) {
}
req.Segments = append(req.Segments, seg2Drop)
}
segmentManager.EXPECT().DropSegmentsOfChannel(mock.Anything, mock.Anything).Return()
resp, err := svr.DropVirtualChannel(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
<-spyCh
// resend
resp, err = svr.DropVirtualChannel(ctx, req)
assert.NoError(t, err)

View File

@ -112,7 +112,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
}
timeOfSeal, _ := tsoutil.ParseTS(ts)
sealedSegmentIDs, err := s.segmentManager.SealAllSegments(ctx, req.GetCollectionID(), req.GetSegmentIDs())
sealedSegmentIDs, err := s.segmentManager.SealAllSegments(ctx, coll.VChannelNames, req.GetSegmentIDs())
if err != nil {
return &datapb.FlushResponse{
Status: merr.Status(errors.Wrapf(err, "failed to flush collection %d",
@ -507,10 +507,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
// Set segment state
if req.GetDropped() {
// segmentManager manages growing segments
s.segmentManager.DropSegment(ctx, req.GetSegmentID())
s.segmentManager.DropSegment(ctx, req.GetChannel(), req.GetPartitionID(), req.GetSegmentID())
operators = append(operators, UpdateStatusOperator(req.GetSegmentID(), commonpb.SegmentState_Dropped))
} else if req.GetFlushed() {
s.segmentManager.DropSegment(ctx, req.GetSegmentID())
s.segmentManager.DropSegment(ctx, req.GetChannel(), req.GetPartitionID(), req.GetSegmentID())
// set segment to SegmentState_Flushing
operators = append(operators, UpdateStatusOperator(req.GetSegmentID(), commonpb.SegmentState_Flushing))
}
@ -1446,10 +1446,7 @@ func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeT
s.updateSegmentStatistics(segmentStats)
if err := s.segmentManager.ExpireAllocations(channel, ts); err != nil {
log.Warn("failed to expire allocations", zap.Error(err))
return err
}
s.segmentManager.ExpireAllocations(channel, ts)
flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, channel, ts)
if err != nil {

View File

@ -773,7 +773,7 @@ func (s *ServerSuite) TestFlush_NormalCase() {
s.testServer.cluster = mockCluster
schema := newTestSchema()
s.testServer.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}})
s.testServer.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}, VChannelNames: []string{"channel-1"}})
allocations, err := s.testServer.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1)
s.NoError(err)
s.EqualValues(1, len(allocations))