From 2c1e8f477432db46f4a8f8eea35f019ffd8c40e8 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 29 Apr 2024 10:59:26 +0800 Subject: [PATCH] enhance: Use `struct{}` for sync task future result (#32673) Related to #27675 Use `struct{}` instead `error` for sync task future result type to reduce result size and preventing logci error. Also change some unused parameter to `_` to suppress lint warning Signed-off-by: Congqi Xia --- internal/datanode/flow_graph_write_node.go | 2 +- internal/datanode/importv2/scheduler.go | 6 ++-- internal/datanode/importv2/scheduler_test.go | 18 +++++------ internal/datanode/metrics_info.go | 2 +- internal/datanode/mock_fgmanager.go | 32 +++++++++++++++++++ .../datanode/syncmgr/mock_sync_manager.go | 12 +++---- .../datanode/syncmgr/storage_serializer.go | 1 + internal/datanode/syncmgr/sync_manager.go | 12 +++---- .../datanode/syncmgr/sync_manager_test.go | 8 ++--- internal/datanode/writebuffer/write_buffer.go | 8 ++--- .../datanode/writebuffer/write_buffer_test.go | 4 +-- 11 files changed, 68 insertions(+), 37 deletions(-) diff --git a/internal/datanode/flow_graph_write_node.go b/internal/datanode/flow_graph_write_node.go index 7f30cb2657..517f3ed98b 100644 --- a/internal/datanode/flow_graph_write_node.go +++ b/internal/datanode/flow_graph_write_node.go @@ -109,7 +109,7 @@ func (wNode *writeNode) Operate(in []Msg) []Msg { } func newWriteNode( - ctx context.Context, + _ context.Context, writeBufferManager writebuffer.BufferManager, updater statsUpdater, config *nodeConfig, diff --git a/internal/datanode/importv2/scheduler.go b/internal/datanode/importv2/scheduler.go index edbd92f001..37884d87d8 100644 --- a/internal/datanode/importv2/scheduler.go +++ b/internal/datanode/importv2/scheduler.go @@ -281,7 +281,7 @@ func (s *scheduler) Import(task Task) []*conc.Future[any] { func (s *scheduler) importFile(reader importutilv2.Reader, task Task) error { iTask := task.(*ImportTask) - syncFutures := make([]*conc.Future[error], 0) + syncFutures := make([]*conc.Future[struct{}], 0) syncTasks := make([]syncmgr.Task, 0) for { data, err := reader.Read() @@ -321,9 +321,9 @@ func (s *scheduler) importFile(reader importutilv2.Reader, task Task) error { return nil } -func (s *scheduler) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[error], []syncmgr.Task, error) { +func (s *scheduler) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) { log.Info("start to sync import data", WrapLogFields(task)...) - futures := make([]*conc.Future[error], 0) + futures := make([]*conc.Future[struct{}], 0) syncTasks := make([]syncmgr.Task, 0) segmentImportedSizes := make(map[int64]int) for channelIdx, datas := range hashedData { diff --git a/internal/datanode/importv2/scheduler_test.go b/internal/datanode/importv2/scheduler_test.go index 10c93fe559..bdb876f750 100644 --- a/internal/datanode/importv2/scheduler_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -357,9 +357,9 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() { cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) s.scheduler.cm = cm - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { - future := conc.Go(func() (error, error) { - return nil, nil + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] { + future := conc.Go(func() (struct{}, error) { + return struct{}{}, nil }) return future }) @@ -418,9 +418,9 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() { cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) s.scheduler.cm = cm - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { - future := conc.Go(func() (error, error) { - return errors.New("mock err"), errors.New("mock err") + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] { + future := conc.Go(func() (struct{}, error) { + return struct{}{}, errors.New("mock err") }) return future }) @@ -494,9 +494,9 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() { } func (s *SchedulerSuite) TestScheduler_ImportFile() { - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { - future := conc.Go(func() (error, error) { - return nil, nil + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] { + future := conc.Go(func() (struct{}, error) { + return struct{}{}, nil }) return future }) diff --git a/internal/datanode/metrics_info.go b/internal/datanode/metrics_info.go index 804a85d3f9..407b52d625 100644 --- a/internal/datanode/metrics_info.go +++ b/internal/datanode/metrics_info.go @@ -65,7 +65,7 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro }, nil } -func (node *DataNode) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { +func (node *DataNode) getSystemInfoMetrics(_ context.Context, _ *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { // TODO(dragondriver): add more metrics usedMem := hardware.GetUsedMemoryCount() totalMem := hardware.GetMemoryCount() diff --git a/internal/datanode/mock_fgmanager.go b/internal/datanode/mock_fgmanager.go index 73ed8f50a3..c31835d363 100644 --- a/internal/datanode/mock_fgmanager.go +++ b/internal/datanode/mock_fgmanager.go @@ -132,6 +132,38 @@ func (_c *MockFlowgraphManager_ClearFlowgraphs_Call) RunAndReturn(run func()) *M return _c } +// Close provides a mock function with given fields: +func (_m *MockFlowgraphManager) Close() { + _m.Called() +} + +// MockFlowgraphManager_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockFlowgraphManager_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) Close() *MockFlowgraphManager_Close_Call { + return &MockFlowgraphManager_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockFlowgraphManager_Close_Call) Run(run func()) *MockFlowgraphManager_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_Close_Call) Return() *MockFlowgraphManager_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgraphManager_Close_Call { + _c.Call.Return(run) + return _c +} + // GetCollectionIDs provides a mock function with given fields: func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 { ret := _m.Called() diff --git a/internal/datanode/syncmgr/mock_sync_manager.go b/internal/datanode/syncmgr/mock_sync_manager.go index 1c1ea504b2..34c69ac6b0 100644 --- a/internal/datanode/syncmgr/mock_sync_manager.go +++ b/internal/datanode/syncmgr/mock_sync_manager.go @@ -113,15 +113,15 @@ func (_c *MockSyncManager_GetEarliestPosition_Call) RunAndReturn(run func(string } // SyncData provides a mock function with given fields: ctx, task -func (_m *MockSyncManager) SyncData(ctx context.Context, task Task) *conc.Future[error] { +func (_m *MockSyncManager) SyncData(ctx context.Context, task Task) *conc.Future[struct{}] { ret := _m.Called(ctx, task) - var r0 *conc.Future[error] - if rf, ok := ret.Get(0).(func(context.Context, Task) *conc.Future[error]); ok { + var r0 *conc.Future[struct{}] + if rf, ok := ret.Get(0).(func(context.Context, Task) *conc.Future[struct{}]); ok { r0 = rf(ctx, task) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*conc.Future[error]) + r0 = ret.Get(0).(*conc.Future[struct{}]) } } @@ -147,12 +147,12 @@ func (_c *MockSyncManager_SyncData_Call) Run(run func(ctx context.Context, task return _c } -func (_c *MockSyncManager_SyncData_Call) Return(_a0 *conc.Future[error]) *MockSyncManager_SyncData_Call { +func (_c *MockSyncManager_SyncData_Call) Return(_a0 *conc.Future[struct{}]) *MockSyncManager_SyncData_Call { _c.Call.Return(_a0) return _c } -func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task) *conc.Future[error]) *MockSyncManager_SyncData_Call { +func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task) *conc.Future[struct{}]) *MockSyncManager_SyncData_Call { _c.Call.Return(run) return _c } diff --git a/internal/datanode/syncmgr/storage_serializer.go b/internal/datanode/syncmgr/storage_serializer.go index 45649a8af7..784f349940 100644 --- a/internal/datanode/syncmgr/storage_serializer.go +++ b/internal/datanode/syncmgr/storage_serializer.go @@ -158,6 +158,7 @@ func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) { } func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map[int64]*storage.Blob, error) { + log := log.Ctx(ctx) blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData) if err != nil { return nil, err diff --git a/internal/datanode/syncmgr/sync_manager.go b/internal/datanode/syncmgr/sync_manager.go index 78857a01f1..b86a95b228 100644 --- a/internal/datanode/syncmgr/sync_manager.go +++ b/internal/datanode/syncmgr/sync_manager.go @@ -44,7 +44,7 @@ type SyncMeta struct { // it processes the sync tasks inside and changes the meta. type SyncManager interface { // SyncData is the method to submit sync task. - SyncData(ctx context.Context, task Task) *conc.Future[error] + SyncData(ctx context.Context, task Task) *conc.Future[struct{}] // GetEarliestPosition returns the earliest position (normally start position) of the processing sync task of provided channel. GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) // Block allows caller to block tasks of provided segment id. @@ -104,7 +104,7 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) { } } -func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[error] { +func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[struct{}] { switch t := task.(type) { case *SyncTask: t.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager) @@ -118,16 +118,16 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[er // safeSubmitTask handles submitting task logic with optimistic target check logic // when task returns errTargetSegmentNotMatch error // perform refetch then retry logic -func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[error] { +func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[struct{}] { taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp()) mgr.tasks.Insert(taskKey, task) - return conc.Go[error](func() (error, error) { + return conc.Go(func() (struct{}, error) { defer mgr.tasks.Remove(taskKey) for { targetID, err := task.CalcTargetSegment() if err != nil { - return err, err + return struct{}{}, err } log.Info("task calculated target segment id", zap.Int64("targetID", targetID), @@ -142,7 +142,7 @@ func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[error] { log.Info("target updated during submitting", zap.Error(err)) continue } - return err, err + return struct{}{}, err } }) } diff --git a/internal/datanode/syncmgr/sync_manager_test.go b/internal/datanode/syncmgr/sync_manager_test.go index a9491e9922..b5ff1d9bde 100644 --- a/internal/datanode/syncmgr/sync_manager_test.go +++ b/internal/datanode/syncmgr/sync_manager_test.go @@ -171,9 +171,8 @@ func (s *SyncManagerSuite) TestSubmit() { f := manager.SyncData(context.Background(), task) s.NotNil(f) - r, err := f.Await() + _, err = f.Await() s.NoError(err) - s.NoError(r) } func (s *SyncManagerSuite) TestCompacted() { @@ -203,9 +202,8 @@ func (s *SyncManagerSuite) TestCompacted() { f := manager.SyncData(context.Background(), task) s.NotNil(f) - r, err := f.Await() + _, err = f.Await() s.NoError(err) - s.NoError(r) s.EqualValues(1001, segmentID.Load()) } @@ -321,7 +319,7 @@ func (s *SyncManagerSuite) TestTargetUpdated() { task.EXPECT().Run().Return(nil).Once() f := manager.SyncData(context.Background(), task) - err, _ = f.Await() + _, err = f.Await() s.NoError(err) } diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index e1410b5ecc..6c64060cca 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -289,7 +289,7 @@ func (wb *writeBufferBase) cleanupCompactedSegments() { } } -func (wb *writeBufferBase) sealSegments(ctx context.Context, segmentIDs []int64) error { +func (wb *writeBufferBase) sealSegments(_ context.Context, segmentIDs []int64) error { // mark segment flushing if segment was growing wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Sealed), metacache.WithSegmentIDs(segmentIDs...), @@ -297,9 +297,9 @@ func (wb *writeBufferBase) sealSegments(ctx context.Context, segmentIDs []int64) return nil } -func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[error] { +func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[struct{}] { log := log.Ctx(ctx) - result := make([]*conc.Future[error], 0, len(segmentIDs)) + result := make([]*conc.Future[struct{}], 0, len(segmentIDs)) for _, segmentID := range segmentIDs { syncTask, err := wb.getSyncTask(ctx, segmentID) if err != nil { @@ -563,7 +563,7 @@ func (wb *writeBufferBase) Close(drop bool) { return } - var futures []*conc.Future[error] + var futures []*conc.Future[struct{}] for id := range wb.buffers { syncTask, err := wb.getSyncTask(context.Background(), id) if err != nil { diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go index 95842f2aac..879c8b3fea 100644 --- a/internal/datanode/writebuffer/write_buffer_test.go +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -357,8 +357,8 @@ func (s *WriteBufferSuite) TestEvictBuffer() { s.metacache.EXPECT().GetSegmentByID(int64(2)).Return(segment, true) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil) - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(conc.Go[error](func() (error, error) { - return nil, nil + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) { + return struct{}{}, nil })) defer func() { s.wb.mut.Lock()