diff --git a/internal/datanode/broker/mock_broker.go b/internal/datanode/broker/mock_broker.go index f8b731c80e..ae735bff96 100644 --- a/internal/datanode/broker/mock_broker.go +++ b/internal/datanode/broker/mock_broker.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package broker @@ -63,8 +63,8 @@ type MockBroker_AssignSegmentID_Call struct { } // AssignSegmentID is a helper method to define mock.On call -// - ctx context.Context -// - reqs ...*datapb.SegmentIDRequest +// - ctx context.Context +// - reqs ...*datapb.SegmentIDRequest func (_e *MockBroker_Expecter) AssignSegmentID(ctx interface{}, reqs ...interface{}) *MockBroker_AssignSegmentID_Call { return &MockBroker_AssignSegmentID_Call{Call: _e.mock.On("AssignSegmentID", append([]interface{}{ctx}, reqs...)...)} @@ -125,8 +125,8 @@ type MockBroker_DropVirtualChannel_Call struct { } // DropVirtualChannel is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.DropVirtualChannelRequest +// - ctx context.Context +// - req *datapb.DropVirtualChannelRequest func (_e *MockBroker_Expecter) DropVirtualChannel(ctx interface{}, req interface{}) *MockBroker_DropVirtualChannel_Call { return &MockBroker_DropVirtualChannel_Call{Call: _e.mock.On("DropVirtualChannel", ctx, req)} } @@ -180,8 +180,8 @@ type MockBroker_GetSegmentInfo_Call struct { } // GetSegmentInfo is a helper method to define mock.On call -// - ctx context.Context -// - segmentIDs []int64 +// - ctx context.Context +// - segmentIDs []int64 func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentIDs interface{}) *MockBroker_GetSegmentInfo_Call { return &MockBroker_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", ctx, segmentIDs)} } @@ -223,8 +223,8 @@ type MockBroker_ReportTimeTick_Call struct { } // ReportTimeTick is a helper method to define mock.On call -// - ctx context.Context -// - msgs []*msgpb.DataNodeTtMsg +// - ctx context.Context +// - msgs []*msgpb.DataNodeTtMsg func (_e *MockBroker_Expecter) ReportTimeTick(ctx interface{}, msgs interface{}) *MockBroker_ReportTimeTick_Call { return &MockBroker_ReportTimeTick_Call{Call: _e.mock.On("ReportTimeTick", ctx, msgs)} } @@ -266,8 +266,8 @@ type MockBroker_SaveBinlogPaths_Call struct { } // SaveBinlogPaths is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.SaveBinlogPathsRequest +// - ctx context.Context +// - req *datapb.SaveBinlogPathsRequest func (_e *MockBroker_Expecter) SaveBinlogPaths(ctx interface{}, req interface{}) *MockBroker_SaveBinlogPaths_Call { return &MockBroker_SaveBinlogPaths_Call{Call: _e.mock.On("SaveBinlogPaths", ctx, req)} } @@ -309,8 +309,8 @@ type MockBroker_UpdateChannelCheckpoint_Call struct { } // UpdateChannelCheckpoint is a helper method to define mock.On call -// - ctx context.Context -// - channelCPs []*msgpb.MsgPosition +// - ctx context.Context +// - channelCPs []*msgpb.MsgPosition func (_e *MockBroker_Expecter) UpdateChannelCheckpoint(ctx interface{}, channelCPs interface{}) *MockBroker_UpdateChannelCheckpoint_Call { return &MockBroker_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, channelCPs)} } @@ -352,8 +352,8 @@ type MockBroker_UpdateSegmentStatistics_Call struct { } // UpdateSegmentStatistics is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.UpdateSegmentStatisticsRequest +// - ctx context.Context +// - req *datapb.UpdateSegmentStatisticsRequest func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call { return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)} } diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index 11a8a93ab4..bbcfbbb827 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -190,9 +190,10 @@ func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanR return true }) - // remote level zero results + // remove level zero results lo.ForEach(completedLevelZero, func(planID int64, _ int) { c.completed.Remove(planID) + c.completedCompactor.Remove(planID) }) if len(results) > 0 { diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction_executor_test.go index d56cb5e2bc..68eb61c531 100644 --- a/internal/datanode/compaction_executor_test.go +++ b/internal/datanode/compaction_executor_test.go @@ -21,7 +21,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" ) @@ -115,6 +117,46 @@ func TestCompactionExecutor(t *testing.T) { t.FailNow() } }) + + t.Run("test getAllCompactionResults", func(t *testing.T) { + ex := newCompactionExecutor() + + mockC := newMockCompactor(true) + ex.executing.Insert(int64(1), mockC) + + ex.completedCompactor.Insert(int64(2), mockC) + ex.completed.Insert(int64(2), &datapb.CompactionPlanResult{ + PlanID: 2, + State: commonpb.CompactionState_Completed, + Type: datapb.CompactionType_MixCompaction, + }) + + ex.completedCompactor.Insert(int64(3), mockC) + ex.completed.Insert(int64(3), &datapb.CompactionPlanResult{ + PlanID: 3, + State: commonpb.CompactionState_Completed, + Type: datapb.CompactionType_Level0DeleteCompaction, + }) + + require.Equal(t, 2, ex.completed.Len()) + require.Equal(t, 2, ex.completedCompactor.Len()) + require.Equal(t, 1, ex.executing.Len()) + + result := ex.getAllCompactionResults() + assert.Equal(t, 3, len(result)) + + for _, res := range result { + if res.PlanID == int64(1) { + assert.Equal(t, res.GetState(), commonpb.CompactionState_Executing) + } else { + assert.Equal(t, res.GetState(), commonpb.CompactionState_Completed) + } + } + + assert.Equal(t, 1, ex.completed.Len()) + require.Equal(t, 1, ex.completedCompactor.Len()) + require.Equal(t, 1, ex.executing.Len()) + }) } func newMockCompactor(isvalid bool) *mockCompactor {