enhance:avoid maintain checkpoint info in sync manager (#33413)

relate: https://github.com/milvus-io/milvus/issues/32915

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2024-06-05 10:05:50 +08:00 committed by GitHub
parent 8858fcb40a
commit 387b7cd7f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 127 additions and 144 deletions

View File

@ -357,7 +357,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.cm = cm
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
@ -418,7 +418,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.cm = cm
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, errors.New("mock err")
})
@ -494,7 +494,7 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
}
func (s *SchedulerSuite) TestScheduler_ImportFile() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})

View File

@ -8,8 +8,6 @@ import (
conc "github.com/milvus-io/milvus/pkg/util/conc"
mock "github.com/stretchr/testify/mock"
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
)
// MockSyncManager is an autogenerated mock type for the SyncManager type
@ -25,67 +23,20 @@ func (_m *MockSyncManager) EXPECT() *MockSyncManager_Expecter {
return &MockSyncManager_Expecter{mock: &_m.Mock}
}
// GetEarliestPosition provides a mock function with given fields: channel
func (_m *MockSyncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {
ret := _m.Called(channel)
var r0 int64
var r1 *msgpb.MsgPosition
if rf, ok := ret.Get(0).(func(string) (int64, *msgpb.MsgPosition)); ok {
return rf(channel)
// SyncData provides a mock function with given fields: ctx, task, callbacks
func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
_va := make([]interface{}, len(callbacks))
for _i := range callbacks {
_va[_i] = callbacks[_i]
}
if rf, ok := ret.Get(0).(func(string) int64); ok {
r0 = rf(channel)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(string) *msgpb.MsgPosition); ok {
r1 = rf(channel)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*msgpb.MsgPosition)
}
}
return r0, r1
}
// MockSyncManager_GetEarliestPosition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetEarliestPosition'
type MockSyncManager_GetEarliestPosition_Call struct {
*mock.Call
}
// GetEarliestPosition is a helper method to define mock.On call
// - channel string
func (_e *MockSyncManager_Expecter) GetEarliestPosition(channel interface{}) *MockSyncManager_GetEarliestPosition_Call {
return &MockSyncManager_GetEarliestPosition_Call{Call: _e.mock.On("GetEarliestPosition", channel)}
}
func (_c *MockSyncManager_GetEarliestPosition_Call) Run(run func(channel string)) *MockSyncManager_GetEarliestPosition_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockSyncManager_GetEarliestPosition_Call) Return(_a0 int64, _a1 *msgpb.MsgPosition) *MockSyncManager_GetEarliestPosition_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockSyncManager_GetEarliestPosition_Call) RunAndReturn(run func(string) (int64, *msgpb.MsgPosition)) *MockSyncManager_GetEarliestPosition_Call {
_c.Call.Return(run)
return _c
}
// SyncData provides a mock function with given fields: ctx, task
func (_m *MockSyncManager) SyncData(ctx context.Context, task Task) *conc.Future[struct{}] {
ret := _m.Called(ctx, task)
var _ca []interface{}
_ca = append(_ca, ctx, task)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *conc.Future[struct{}]
if rf, ok := ret.Get(0).(func(context.Context, Task) *conc.Future[struct{}]); ok {
r0 = rf(ctx, task)
if rf, ok := ret.Get(0).(func(context.Context, Task, ...func(error) error) *conc.Future[struct{}]); ok {
r0 = rf(ctx, task, callbacks...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*conc.Future[struct{}])
@ -103,13 +54,21 @@ type MockSyncManager_SyncData_Call struct {
// SyncData is a helper method to define mock.On call
// - ctx context.Context
// - task Task
func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}) *MockSyncManager_SyncData_Call {
return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", ctx, task)}
// - callbacks ...func(error) error
func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}, callbacks ...interface{}) *MockSyncManager_SyncData_Call {
return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData",
append([]interface{}{ctx, task}, callbacks...)...)}
}
func (_c *MockSyncManager_SyncData_Call) Run(run func(ctx context.Context, task Task)) *MockSyncManager_SyncData_Call {
func (_c *MockSyncManager_SyncData_Call) Run(run func(ctx context.Context, task Task, callbacks ...func(error) error)) *MockSyncManager_SyncData_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(Task))
variadicArgs := make([]func(error) error, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(func(error) error)
}
}
run(args[0].(context.Context), args[1].(Task), variadicArgs...)
})
return _c
}
@ -119,7 +78,7 @@ func (_c *MockSyncManager_SyncData_Call) Return(_a0 *conc.Future[struct{}]) *Moc
return _c
}
func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task) *conc.Future[struct{}]) *MockSyncManager_SyncData_Call {
func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task, ...func(error) error) *conc.Future[struct{}]) *MockSyncManager_SyncData_Call {
_c.Call.Return(run)
return _c
}

View File

@ -46,9 +46,7 @@ type SyncMeta struct {
//go:generate mockery --name=SyncManager --structname=MockSyncManager --output=./ --filename=mock_sync_manager.go --with-expecter --inpackage
type SyncManager interface {
// SyncData is the method to submit sync task.
SyncData(ctx context.Context, task Task) *conc.Future[struct{}]
// GetEarliestPosition returns the earliest position (normally start position) of the processing sync task of provided channel.
GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition)
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}]
}
type syncManager struct {
@ -100,7 +98,7 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) {
}
}
func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[struct{}] {
func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
switch t := task.(type) {
case *SyncTask:
t.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager)
@ -108,13 +106,13 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[st
t.WithAllocator(mgr.allocator)
}
return mgr.safeSubmitTask(task)
return mgr.safeSubmitTask(task, callbacks...)
}
// safeSubmitTask handles submitting task logic with optimistic target check logic
// when task returns errTargetSegmentNotMatch error
// perform refetch then retry logic
func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[struct{}] {
func (mgr *syncManager) safeSubmitTask(task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())
mgr.tasks.Insert(taskKey, task)
@ -124,11 +122,10 @@ func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[struct{}] {
return conc.Go(func() (struct{}, error) { return struct{}{}, err })
}
return mgr.submit(key, task)
return mgr.submit(key, task, callbacks...)
}
func (mgr *syncManager) submit(key int64, task Task) *conc.Future[struct{}] {
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())
func (mgr *syncManager) submit(key int64, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
handler := func(err error) error {
if err == nil {
return nil
@ -156,11 +153,9 @@ func (mgr *syncManager) submit(key int64, task Task) *conc.Future[struct{}] {
)
return mgr.submit(targetID, task).Err()
}
callbacks = append([]func(error) error{handler}, callbacks...)
log.Info("sync mgr sumbit task with key", zap.Int64("key", key))
return mgr.Submit(key, task, handler, func(err error) error {
mgr.tasks.Remove(taskKey)
return err
})
return mgr.Submit(key, task, callbacks...)
}
func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {

View File

@ -329,7 +329,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() {
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(nil)
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
@ -414,7 +414,7 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(nil)
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))

View File

@ -13,7 +13,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -57,6 +56,48 @@ type WriteBuffer interface {
Close(drop bool)
}
type checkpointCandidate struct {
segmentID int64
position *msgpb.MsgPosition
source string
}
type checkpointCandidates struct {
candidates map[string]*checkpointCandidate
mu sync.RWMutex
}
func newCheckpointCandiates() *checkpointCandidates {
return &checkpointCandidates{
candidates: make(map[string]*checkpointCandidate),
}
}
func (c *checkpointCandidates) Remove(segmentID int64, timestamp uint64) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.candidates, fmt.Sprintf("%d-%d", segmentID, timestamp))
}
func (c *checkpointCandidates) Add(segmentID int64, position *msgpb.MsgPosition, source string) {
c.mu.Lock()
defer c.mu.Unlock()
c.candidates[fmt.Sprintf("%d-%d", segmentID, position.GetTimestamp())] = &checkpointCandidate{segmentID, position, source}
}
func (c *checkpointCandidates) GetEarliestWithDefault(def *checkpointCandidate) *checkpointCandidate {
c.mu.RLock()
defer c.mu.RUnlock()
var result *checkpointCandidate = def
for _, candidate := range c.candidates {
if result == nil || candidate.position.GetTimestamp() < result.position.GetTimestamp() {
result = candidate
}
}
return result
}
func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) {
option := defaultWBOption(metacache)
for _, opt := range opts {
@ -86,13 +127,14 @@ type writeBufferBase struct {
pkField *schemapb.FieldSchema
estSizePerRecord int
metaCache metacache.MetaCache
syncMgr syncmgr.SyncManager
broker broker.Broker
serializer syncmgr.Serializer
buffers map[int64]*segmentBuffer // segmentID => segmentBuffer
syncPolicies []SyncPolicy
syncCheckpoint *checkpointCandidates
syncMgr syncmgr.SyncManager
serializer syncmgr.Serializer
checkpoint *msgpb.MsgPosition
flushTimestamp *atomic.Uint64
@ -152,6 +194,7 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2
buffers: make(map[int64]*segmentBuffer),
metaCache: metacache,
serializer: serializer,
syncCheckpoint: newCheckpointCandiates(),
syncPolicies: option.syncPolicies,
flushTimestamp: flushTs,
storagev2Cache: storageV2Cache,
@ -223,59 +266,28 @@ func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
wb.mut.RLock()
defer wb.mut.RUnlock()
// syncCandidate from sync manager
syncSegmentID, syncCandidate := wb.syncMgr.GetEarliestPosition(wb.channelName)
type checkpointCandidate struct {
segmentID int64
position *msgpb.MsgPosition
}
var bufferCandidate *checkpointCandidate
candidates := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *checkpointCandidate {
return &checkpointCandidate{buf.segmentID, buf.EarliestPosition()}
return &checkpointCandidate{buf.segmentID, buf.EarliestPosition(), "segment buffer"}
})
candidates = lo.Filter(candidates, func(candidate *checkpointCandidate, _ int) bool {
return candidate.position != nil
})
if len(candidates) > 0 {
bufferCandidate = lo.MinBy(candidates, func(a, b *checkpointCandidate) bool {
checkpoint := wb.syncCheckpoint.GetEarliestWithDefault(lo.MinBy(candidates, func(a, b *checkpointCandidate) bool {
return a.position.GetTimestamp() < b.position.GetTimestamp()
})
}
}))
var checkpoint *msgpb.MsgPosition
var segmentID int64
var cpSource string
switch {
case bufferCandidate == nil && syncCandidate == nil:
if checkpoint == nil {
// all buffer are empty
log.RatedDebug(60, "checkpoint from latest consumed msg")
log.RatedDebug(60, "checkpoint from latest consumed msg", zap.Uint64("cpTimestamp", wb.checkpoint.GetTimestamp()))
return wb.checkpoint
case bufferCandidate == nil && syncCandidate != nil:
checkpoint = syncCandidate
segmentID = syncSegmentID
cpSource = "syncManager"
case syncCandidate == nil && bufferCandidate != nil:
checkpoint = bufferCandidate.position
segmentID = bufferCandidate.segmentID
cpSource = "segmentBuffer"
case syncCandidate.GetTimestamp() >= bufferCandidate.position.GetTimestamp():
checkpoint = bufferCandidate.position
segmentID = bufferCandidate.segmentID
cpSource = "segmentBuffer"
case syncCandidate.GetTimestamp() < bufferCandidate.position.GetTimestamp():
checkpoint = syncCandidate
segmentID = syncSegmentID
cpSource = "syncManager"
}
log.RatedDebug(20, "checkpoint evaluated",
zap.String("cpSource", cpSource),
zap.Int64("segmentID", segmentID),
zap.Uint64("cpTimestamp", checkpoint.GetTimestamp()))
return checkpoint
zap.String("cpSource", checkpoint.source),
zap.Int64("segmentID", checkpoint.segmentID),
zap.Uint64("cpTimestamp", checkpoint.position.GetTimestamp()))
return checkpoint.position
}
func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) {
@ -330,7 +342,16 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
}
}
result = append(result, wb.syncMgr.SyncData(ctx, syncTask))
result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if err != nil {
return err
}
if syncTask.StartPosition() != nil {
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
}
return nil
}))
}
return result
}
@ -535,6 +556,10 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy
tsFrom, tsTo = timeRange.timestampMin, timeRange.timestampMax
}
if startPos != nil {
wb.syncCheckpoint.Add(segmentID, startPos, "syncing task")
}
actions := []metacache.SegmentAction{}
if insert != nil {
batchSize = int64(insert.GetRowNum())
@ -599,7 +624,15 @@ func (wb *writeBufferBase) Close(drop bool) {
t.WithDrop()
}
f := wb.syncMgr.SyncData(context.Background(), syncTask)
f := wb.syncMgr.SyncData(context.Background(), syncTask, func(err error) error {
if err != nil {
return err
}
if syncTask.StartPosition() != nil {
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
}
return nil
})
futures = append(futures, f)
}

View File

@ -126,20 +126,17 @@ func (s *WriteBufferSuite) TestGetCheckpoint() {
Timestamp: 1000,
}
s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(0, nil).Once()
checkpoint := s.wb.GetCheckpoint()
s.EqualValues(1000, checkpoint.GetTimestamp())
})
s.Run("use_sync_mgr_cp", func() {
s.Run("use_syncing_segment_cp", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}
s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{
Timestamp: 500,
}).Once()
s.wb.syncCheckpoint.Add(1, &msgpb.MsgPosition{Timestamp: 500}, "syncing segments")
defer s.wb.syncCheckpoint.Remove(1, 500)
checkpoint := s.wb.GetCheckpoint()
s.EqualValues(500, checkpoint.GetTimestamp())
@ -150,7 +147,8 @@ func (s *WriteBufferSuite) TestGetCheckpoint() {
Timestamp: 1000,
}
s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(0, nil).Once()
s.wb.syncCheckpoint.Add(1, &msgpb.MsgPosition{Timestamp: 500}, "syncing segments")
defer s.wb.syncCheckpoint.Remove(1, 500)
buf1, err := newSegmentBuffer(2, s.collSchema)
s.Require().NoError(err)
@ -189,9 +187,8 @@ func (s *WriteBufferSuite) TestGetCheckpoint() {
Timestamp: 1000,
}
s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{
Timestamp: 300,
}).Once()
s.wb.syncCheckpoint.Add(1, &msgpb.MsgPosition{Timestamp: 300}, "syncing segments")
defer s.wb.syncCheckpoint.Remove(1, 300)
buf1, err := newSegmentBuffer(2, s.collSchema)
s.Require().NoError(err)
@ -230,9 +227,8 @@ func (s *WriteBufferSuite) TestGetCheckpoint() {
Timestamp: 1000,
}
s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{
Timestamp: 800,
}).Once()
s.wb.syncCheckpoint.Add(1, &msgpb.MsgPosition{Timestamp: 800}, "syncing segments")
defer s.wb.syncCheckpoint.Remove(1, 800)
buf1, err := newSegmentBuffer(2, s.collSchema)
s.Require().NoError(err)
@ -357,7 +353,7 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
s.metacache.EXPECT().GetSegmentByID(int64(2)).Return(segment, true)
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) {
return struct{}{}, nil
}))
defer func() {