enhance: Reduce datanode metacache frequent scan range (#33400)

See also #32165

There were some frequent scan in metacache:
- List all segments whose start positions not synced
- List compacted segments

Those scan shall cause lots of CPU time when flushed segment number is
large meanwhile `Flushed` segments can be skipped in those two scenarios

This PR make:
- Add segment state shortcut in metacache
- List start positions state before `Flushed`
- Make compacted segments state to be `Dropped` and use `Dropped` state
while scanning them

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-05-28 14:19:42 +08:00 committed by GitHub
parent 5e39aa9272
commit e71b7c7cc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 151 additions and 87 deletions

View File

@ -433,6 +433,7 @@ func (s *DataSyncServiceSuite) TestStartStop() {
CollectionID: collMeta.ID, CollectionID: collMeta.ID,
PartitionID: 1, PartitionID: 1,
InsertChannel: insertChannelName, InsertChannel: insertChannelName,
State: commonpb.SegmentState_Flushed,
}, },
1: { 1: {
@ -440,6 +441,7 @@ func (s *DataSyncServiceSuite) TestStartStop() {
CollectionID: collMeta.ID, CollectionID: collMeta.ID,
PartitionID: 1, PartitionID: 1,
InsertChannel: insertChannelName, InsertChannel: insertChannelName,
State: commonpb.SegmentState_Flushed,
}, },
} }
return lo.FilterMap(segmentIDs, func(id int64, _ int) (*datapb.SegmentInfo, bool) { return lo.FilterMap(segmentIDs, func(id int64, _ int) (*datapb.SegmentInfo, bool) {

View File

@ -25,40 +25,75 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
type segmentCriterion struct {
ids typeutil.Set[int64]
states typeutil.Set[commonpb.SegmentState]
others []SegmentFilter
}
func (sc *segmentCriterion) Match(segment *SegmentInfo) bool {
for _, filter := range sc.others {
if !filter.Filter(segment) {
return false
}
}
return true
}
type SegmentFilter interface { type SegmentFilter interface {
Filter(info *SegmentInfo) bool Filter(info *SegmentInfo) bool
SegmentIDs() ([]int64, bool) AddFilter(*segmentCriterion)
} }
// SegmentIDFilter segment filter with segment ids.
type SegmentIDFilter struct { type SegmentIDFilter struct {
segmentIDs []int64 ids typeutil.Set[int64]
ids typeutil.Set[int64]
}
func WithSegmentIDs(segmentIDs ...int64) SegmentFilter {
set := typeutil.NewSet(segmentIDs...)
return &SegmentIDFilter{
segmentIDs: segmentIDs,
ids: set,
}
} }
func (f *SegmentIDFilter) Filter(info *SegmentInfo) bool { func (f *SegmentIDFilter) Filter(info *SegmentInfo) bool {
return f.ids.Contain(info.segmentID) return f.ids.Contain(info.segmentID)
} }
func (f *SegmentIDFilter) SegmentIDs() ([]int64, bool) { func (f *SegmentIDFilter) AddFilter(criterion *segmentCriterion) {
return f.segmentIDs, true criterion.ids = f.ids
} }
func WithSegmentIDs(segmentIDs ...int64) SegmentFilter {
set := typeutil.NewSet(segmentIDs...)
return &SegmentIDFilter{
ids: set,
}
}
// SegmentStateFilter segment filter with segment states.
type SegmentStateFilter struct {
states typeutil.Set[commonpb.SegmentState]
}
func (f *SegmentStateFilter) Filter(info *SegmentInfo) bool {
return f.states.Contain(info.State())
}
func (f *SegmentStateFilter) AddFilter(criterion *segmentCriterion) {
criterion.states = f.states
}
func WithSegmentState(states ...commonpb.SegmentState) SegmentFilter {
set := typeutil.NewSet(states...)
return &SegmentStateFilter{
states: set,
}
}
// SegmentFilterFunc implements segment filter with other filters logic.
type SegmentFilterFunc func(info *SegmentInfo) bool type SegmentFilterFunc func(info *SegmentInfo) bool
func (f SegmentFilterFunc) Filter(info *SegmentInfo) bool { func (f SegmentFilterFunc) Filter(info *SegmentInfo) bool {
return f(info) return f(info)
} }
func (f SegmentFilterFunc) SegmentIDs() ([]int64, bool) { func (f SegmentFilterFunc) AddFilter(criterion *segmentCriterion) {
return nil, false criterion.others = append(criterion.others, f)
} }
func WithPartitionID(partitionID int64) SegmentFilter { func WithPartitionID(partitionID int64) SegmentFilter {
@ -67,13 +102,6 @@ func WithPartitionID(partitionID int64) SegmentFilter {
}) })
} }
func WithSegmentState(states ...commonpb.SegmentState) SegmentFilter {
set := typeutil.NewSet(states...)
return SegmentFilterFunc(func(info *SegmentInfo) bool {
return set.Len() > 0 && set.Contain(info.state)
})
}
func WithStartPosNotRecorded() SegmentFilter { func WithStartPosNotRecorded() SegmentFilter {
return SegmentFilterFunc(func(info *SegmentInfo) bool { return SegmentFilterFunc(func(info *SegmentInfo) bool {
return !info.startPosRecorded return !info.startPosRecorded

View File

@ -60,18 +60,32 @@ type PkStatsFactory func(vchannel *datapb.SegmentInfo) *BloomFilterSet
type metaCacheImpl struct { type metaCacheImpl struct {
collectionID int64 collectionID int64
vChannelName string vChannelName string
segmentInfos map[int64]*SegmentInfo
schema *schemapb.CollectionSchema schema *schemapb.CollectionSchema
mu sync.RWMutex
mu sync.RWMutex
segmentInfos map[int64]*SegmentInfo
stateSegments map[commonpb.SegmentState]map[int64]*SegmentInfo
} }
func NewMetaCache(info *datapb.ChannelWatchInfo, factory PkStatsFactory) MetaCache { func NewMetaCache(info *datapb.ChannelWatchInfo, factory PkStatsFactory) MetaCache {
vchannel := info.GetVchan() vchannel := info.GetVchan()
cache := &metaCacheImpl{ cache := &metaCacheImpl{
collectionID: vchannel.GetCollectionID(), collectionID: vchannel.GetCollectionID(),
vChannelName: vchannel.GetChannelName(), vChannelName: vchannel.GetChannelName(),
segmentInfos: make(map[int64]*SegmentInfo), segmentInfos: make(map[int64]*SegmentInfo),
schema: info.GetSchema(), stateSegments: make(map[commonpb.SegmentState]map[int64]*SegmentInfo),
schema: info.GetSchema(),
}
for _, state := range []commonpb.SegmentState{
commonpb.SegmentState_Growing,
commonpb.SegmentState_Sealed,
commonpb.SegmentState_Flushing,
commonpb.SegmentState_Flushed,
commonpb.SegmentState_Dropped,
commonpb.SegmentState_Importing,
} {
cache.stateSegments[state] = make(map[int64]*SegmentInfo)
} }
cache.init(vchannel, factory) cache.init(vchannel, factory)
@ -80,13 +94,13 @@ func NewMetaCache(info *datapb.ChannelWatchInfo, factory PkStatsFactory) MetaCac
func (c *metaCacheImpl) init(vchannel *datapb.VchannelInfo, factory PkStatsFactory) { func (c *metaCacheImpl) init(vchannel *datapb.VchannelInfo, factory PkStatsFactory) {
for _, seg := range vchannel.FlushedSegments { for _, seg := range vchannel.FlushedSegments {
c.segmentInfos[seg.GetID()] = NewSegmentInfo(seg, factory(seg)) c.addSegment(NewSegmentInfo(seg, factory(seg)))
} }
for _, seg := range vchannel.UnflushedSegments { for _, seg := range vchannel.UnflushedSegments {
// segment state could be sealed for growing segment if flush request processed before datanode watch // segment state could be sealed for growing segment if flush request processed before datanode watch
seg.State = commonpb.SegmentState_Growing seg.State = commonpb.SegmentState_Growing
c.segmentInfos[seg.GetID()] = NewSegmentInfo(seg, factory(seg)) c.addSegment(NewSegmentInfo(seg, factory(seg)))
} }
} }
@ -110,7 +124,13 @@ func (c *metaCacheImpl) AddSegment(segInfo *datapb.SegmentInfo, factory PkStatsF
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.segmentInfos[segInfo.GetID()] = segment c.addSegment(segment)
}
func (c *metaCacheImpl) addSegment(segment *SegmentInfo) {
segID := segment.SegmentID()
c.segmentInfos[segID] = segment
c.stateSegments[segment.State()][segID] = segment
} }
func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRows int64, bfs *BloomFilterSet, oldSegmentIDs ...int64) { func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRows int64, bfs *BloomFilterSet, oldSegmentIDs ...int64) {
@ -121,7 +141,7 @@ func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRo
if numOfRows > 0 { if numOfRows > 0 {
compactTo = newSegmentID compactTo = newSegmentID
if _, ok := c.segmentInfos[newSegmentID]; !ok { if _, ok := c.segmentInfos[newSegmentID]; !ok {
c.segmentInfos[newSegmentID] = &SegmentInfo{ c.addSegment(&SegmentInfo{
segmentID: newSegmentID, segmentID: newSegmentID,
partitionID: partitionID, partitionID: partitionID,
state: commonpb.SegmentState_Flushed, state: commonpb.SegmentState_Flushed,
@ -129,7 +149,7 @@ func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRo
flushedRows: numOfRows, flushedRows: numOfRows,
startPosRecorded: true, startPosRecorded: true,
bfs: bfs, bfs: bfs,
} })
} }
log.Info("add compactTo segment info metacache", zap.Int64("segmentID", compactTo)) log.Info("add compactTo segment info metacache", zap.Int64("segmentID", compactTo))
} }
@ -140,7 +160,10 @@ func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRo
oldSet.Contain(segment.compactTo) { oldSet.Contain(segment.compactTo) {
updated := segment.Clone() updated := segment.Clone()
updated.compactTo = compactTo updated.compactTo = compactTo
updated.state = commonpb.SegmentState_Dropped
c.segmentInfos[segment.segmentID] = updated c.segmentInfos[segment.segmentID] = updated
delete(c.stateSegments[commonpb.SegmentState_Flushed], segment.segmentID)
c.stateSegments[commonpb.SegmentState_Dropped][segment.segmentID] = segment
log.Info("update segment compactTo", log.Info("update segment compactTo",
zap.Int64("segmentID", segment.segmentID), zap.Int64("segmentID", segment.segmentID),
zap.Int64("originalCompactTo", segment.compactTo), zap.Int64("originalCompactTo", segment.compactTo),
@ -160,6 +183,7 @@ func (c *metaCacheImpl) RemoveSegments(filters ...SegmentFilter) []int64 {
var result []int64 var result []int64
process := func(id int64, info *SegmentInfo) { process := func(id int64, info *SegmentInfo) {
delete(c.segmentInfos, id) delete(c.segmentInfos, id)
delete(c.stateSegments[info.State()], id)
result = append(result, id) result = append(result, id)
} }
c.rangeWithFilter(process, filters...) c.rangeWithFilter(process, filters...)
@ -207,6 +231,8 @@ func (c *metaCacheImpl) UpdateSegments(action SegmentAction, filters ...SegmentF
nInfo := info.Clone() nInfo := info.Clone()
action(nInfo) action(nInfo)
c.segmentInfos[id] = nInfo c.segmentInfos[id] = nInfo
delete(c.stateSegments[info.State()], info.SegmentID())
c.stateSegments[nInfo.State()][nInfo.SegmentID()] = nInfo
}, filters...) }, filters...)
} }
@ -223,38 +249,38 @@ func (c *metaCacheImpl) PredictSegments(pk storage.PrimaryKey, filters ...Segmen
} }
func (c *metaCacheImpl) rangeWithFilter(fn func(id int64, info *SegmentInfo), filters ...SegmentFilter) { func (c *metaCacheImpl) rangeWithFilter(fn func(id int64, info *SegmentInfo), filters ...SegmentFilter) {
var hasIDs bool criterion := &segmentCriterion{}
set := typeutil.NewSet[int64]()
filtered := make([]SegmentFilter, 0, len(filters))
for _, filter := range filters { for _, filter := range filters {
ids, ok := filter.SegmentIDs() filter.AddFilter(criterion)
if ok {
set.Insert(ids...)
hasIDs = true
} else {
filtered = append(filtered, filter)
}
}
mergedFilter := func(info *SegmentInfo) bool {
for _, filter := range filtered {
if !filter.Filter(info) {
return false
}
}
return true
} }
if hasIDs { var candidates []map[int64]*SegmentInfo
for id := range set { if criterion.states != nil {
info, has := c.segmentInfos[id] candidates = lo.Map(criterion.states.Collect(), func(state commonpb.SegmentState, _ int) map[int64]*SegmentInfo {
if has && mergedFilter(info) { return c.stateSegments[state]
fn(id, info) })
}
}
} else { } else {
for id, info := range c.segmentInfos { candidates = []map[int64]*SegmentInfo{
if mergedFilter(info) { c.segmentInfos,
fn(id, info) }
}
for _, candidate := range candidates {
var segments map[int64]*SegmentInfo
if criterion.ids != nil {
segments = lo.SliceToMap(lo.FilterMap(criterion.ids.Collect(), func(id int64, _ int) (*SegmentInfo, bool) {
segment, ok := candidate[id]
return segment, ok
}), func(segment *SegmentInfo) (int64, *SegmentInfo) {
return segment.SegmentID(), segment
})
} else {
segments = candidate
}
for id, segment := range segments {
if criterion.Match(segment) {
fn(id, segment)
} }
} }
} }

View File

@ -283,6 +283,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
ID: segmentID, ID: segmentID,
CollectionID: 1, CollectionID: 1,
PartitionID: 2, PartitionID: 2,
State: commonpb.SegmentState_Growing,
StartPosition: &msgpb.MsgPosition{}, StartPosition: &msgpb.MsgPosition{},
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }) }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() })

View File

@ -7,6 +7,7 @@ import (
"github.com/samber/lo" "github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/broker" "github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
@ -60,7 +61,8 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
Position: pack.checkpoint, Position: pack.checkpoint,
}) })
startPos := lo.Map(pack.metacache.GetSegmentsBy(metacache.WithStartPosNotRecorded()), func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition { startPos := lo.Map(pack.metacache.GetSegmentsBy(metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Sealed, commonpb.SegmentState_Flushing),
metacache.WithStartPosNotRecorded()), func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition {
return &datapb.SegmentStartPosition{ return &datapb.SegmentStartPosition{
SegmentID: info.SegmentID(), SegmentID: info.SegmentID(),
StartPosition: info.StartPosition(), StartPosition: info.StartPosition(),
@ -150,7 +152,8 @@ func (b *brokerMetaWriter) UpdateSyncV2(pack *SyncTaskV2) error {
Position: pack.checkpoint, Position: pack.checkpoint,
}) })
startPos := lo.Map(pack.metacache.GetSegmentsBy(metacache.WithStartPosNotRecorded()), func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition { startPos := lo.Map(pack.metacache.GetSegmentsBy(metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing),
metacache.WithStartPosNotRecorded()), func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition {
return &datapb.SegmentStartPosition{ return &datapb.SegmentStartPosition{
SegmentID: info.SegmentID(), SegmentID: info.SegmentID(),
StartPosition: info.StartPosition(), StartPosition: info.StartPosition(),

View File

@ -39,7 +39,7 @@ func (s *MetaWriterSuite) TestNormalSave() {
bfs := metacache.NewBloomFilterSet() bfs := metacache.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg) metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
task := NewSyncTask() task := NewSyncTask()
@ -55,7 +55,7 @@ func (s *MetaWriterSuite) TestReturnError() {
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg) metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
task := NewSyncTask() task := NewSyncTask()
task.WithMetaCache(s.metacache) task.WithMetaCache(s.metacache)
err := s.writer.UpdateSync(task) err := s.writer.UpdateSync(task)
@ -69,7 +69,7 @@ func (s *MetaWriterSuite) TestNormalSaveV2() {
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg) metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
task := NewSyncTaskV2() task := NewSyncTaskV2()
task.WithMetaCache(s.metacache) task.WithMetaCache(s.metacache)
err := s.writer.UpdateSyncV2(task) err := s.writer.UpdateSyncV2(task)
@ -83,7 +83,7 @@ func (s *MetaWriterSuite) TestReturnErrorV2() {
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg) metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
task := NewSyncTaskV2() task := NewSyncTaskV2()
task.WithMetaCache(s.metacache) task.WithMetaCache(s.metacache)
err := s.writer.UpdateSyncV2(task) err := s.writer.UpdateSyncV2(task)

View File

@ -57,7 +57,7 @@ type MockSyncManager_GetEarliestPosition_Call struct {
} }
// GetEarliestPosition is a helper method to define mock.On call // GetEarliestPosition is a helper method to define mock.On call
// - channel string // - channel string
func (_e *MockSyncManager_Expecter) GetEarliestPosition(channel interface{}) *MockSyncManager_GetEarliestPosition_Call { func (_e *MockSyncManager_Expecter) GetEarliestPosition(channel interface{}) *MockSyncManager_GetEarliestPosition_Call {
return &MockSyncManager_GetEarliestPosition_Call{Call: _e.mock.On("GetEarliestPosition", channel)} return &MockSyncManager_GetEarliestPosition_Call{Call: _e.mock.On("GetEarliestPosition", channel)}
} }
@ -101,8 +101,8 @@ type MockSyncManager_SyncData_Call struct {
} }
// SyncData is a helper method to define mock.On call // SyncData is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - task Task // - task Task
func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}) *MockSyncManager_SyncData_Call { func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}) *MockSyncManager_SyncData_Call {
return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", ctx, task)} return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", ctx, task)}
} }

View File

@ -155,7 +155,7 @@ func (s *SyncManagerSuite) TestSubmit() {
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg) metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
manager, err := NewSyncManager(s.chunkManager, s.allocator) manager, err := NewSyncManager(s.chunkManager, s.allocator)
@ -186,7 +186,7 @@ func (s *SyncManagerSuite) TestCompacted() {
metacache.UpdateNumOfRows(1000)(seg) metacache.UpdateNumOfRows(1000)(seg)
metacache.CompactTo(1001)(seg) metacache.CompactTo(1001)(seg)
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
manager, err := NewSyncManager(s.chunkManager, s.allocator) manager, err := NewSyncManager(s.chunkManager, s.allocator)

View File

@ -185,7 +185,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
metacache.UpdateNumOfRows(1000)(seg) metacache.UpdateNumOfRows(1000)(seg)
seg.GetBloomFilterSet().Roll() seg.GetBloomFilterSet().Roll()
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.Run("without_data", func() { s.Run("without_data", func() {
@ -268,7 +268,7 @@ func (s *SyncTaskSuite) TestRunL0Segment() {
bfs := metacache.NewBloomFilterSet() bfs := metacache.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{Level: datapb.SegmentLevel_L0}, bfs) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{Level: datapb.SegmentLevel_L0}, bfs)
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.Run("pure_delete_l0_flush", func() { s.Run("pure_delete_l0_flush", func() {
@ -362,7 +362,7 @@ func (s *SyncTaskSuite) TestRunError() {
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, metacache.NewBloomFilterSet()) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, metacache.NewBloomFilterSet())
metacache.UpdateNumOfRows(1000)(seg) metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.Run("allocate_id_fail", func() { s.Run("allocate_id_fail", func() {
mockAllocator := allocator.NewMockAllocator(s.T()) mockAllocator := allocator.NewMockAllocator(s.T())

View File

@ -216,7 +216,7 @@ func (s *SyncTaskSuiteV2) TestRunNormal() {
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg) metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.Run("without_insert_delete", func() { s.Run("without_insert_delete", func() {

View File

@ -98,7 +98,8 @@ func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
// update pk oracle // update pk oracle
for _, inData := range groups { for _, inData := range groups {
// segment shall always exists after buffer insert // segment shall always exists after buffer insert
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) segments := wb.metaCache.GetSegmentsBy(
metacache.WithSegmentIDs(inData.segmentID))
for _, segment := range segments { for _, segment := range segments {
for _, fieldData := range inData.pkField { for _, fieldData := range inData.pkField {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)

View File

@ -218,7 +218,7 @@ func (s *BFWriteBufferSuite) TestBufferData() {
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{})
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
@ -248,7 +248,7 @@ func (s *BFWriteBufferSuite) TestBufferData() {
s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheVarchar.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) s.metacacheVarchar.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{})
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar) pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewVarCharPrimaryKey(fmt.Sprintf("%v", id)) })) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewVarCharPrimaryKey(fmt.Sprintf("%v", id)) }))
@ -273,7 +273,7 @@ func (s *BFWriteBufferSuite) TestBufferData() {
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{})
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar) pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
@ -294,7 +294,7 @@ func (s *BFWriteBufferSuite) TestBufferData() {
s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheVarchar.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) s.metacacheVarchar.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{})
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
@ -325,7 +325,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() {
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once() s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once()
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true) s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true)
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{})
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
@ -363,7 +363,7 @@ func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() {
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{})
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
@ -409,7 +409,7 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once() s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once()
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true) s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true)
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{1003}) // mocked compacted s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{1003}) // mocked compacted
s.metacacheInt64.EXPECT().RemoveSegments(mock.Anything).Return([]int64{1003}) s.metacacheInt64.EXPECT().RemoveSegments(mock.Anything).Return([]int64{1003})
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()

View File

@ -186,7 +186,7 @@ func (s *L0WriteBufferSuite) TestBufferData() {
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{})
metrics.DataNodeFlowGraphBufferDataSize.Reset() metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
@ -215,7 +215,7 @@ func (s *L0WriteBufferSuite) TestBufferData() {
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{})
metrics.DataNodeFlowGraphBufferDataSize.Reset() metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})

View File

@ -279,7 +279,10 @@ func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) {
} }
func (wb *writeBufferBase) cleanupCompactedSegments() { func (wb *writeBufferBase) cleanupCompactedSegments() {
segmentIDs := wb.metaCache.GetSegmentIDsBy(metacache.WithCompacted(), metacache.WithNoSyncingTask()) segmentIDs := wb.metaCache.GetSegmentIDsBy(
metacache.WithSegmentState(commonpb.SegmentState_Dropped),
metacache.WithCompacted(),
metacache.WithNoSyncingTask())
// remove compacted only when there is no writebuffer // remove compacted only when there is no writebuffer
targetIDs := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool { targetIDs := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool {
_, ok := wb.buffers[segmentID] _, ok := wb.buffers[segmentID]