diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index bc1659f86c..83f25869a7 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -329,12 +329,14 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) { } if plan.GetType() == datapb.CompactionType_MixCompaction { + segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs)) for _, seg := range plan.GetSegmentBinlogs() { if info := c.meta.GetHealthySegment(seg.GetSegmentID()); info != nil { seg.Deltalogs = info.GetDeltalogs() + segIDMap[seg.SegmentID] = info.GetDeltalogs() } } - log.Info("Compaction handler refreshed mix compaction plan") + log.Info("Compaction handler refreshed mix compaction plan", zap.Any("segID2DeltaLogs", segIDMap)) return } } @@ -488,73 +490,95 @@ func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask { return c.plans[planID] } -// expireCompaction set the compaction state to expired func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // for DC might add new task while GetCompactionState. executingTasks := c.getTasksByState(executing) timeoutTasks := c.getTasksByState(timeout) planStates := c.sessions.GetCompactionPlansResults() + cachedPlans := []int64{} + // TODO reduce the lock range c.mu.Lock() - defer c.mu.Unlock() for _, task := range executingTasks { - planResult, ok := planStates[task.plan.PlanID] - state := planResult.GetState() + log := log.With(zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) planID := task.plan.PlanID - // check whether the state of CompactionPlan is working - if ok { - if state == commonpb.CompactionState_Completed { - log.Info("complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID)) - err := c.completeCompaction(planResult) - if err != nil { - log.Warn("fail to complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID), zap.Error(err)) + cachedPlans = append(cachedPlans, planID) + if nodePlan, ok := planStates[planID]; ok { + planResult := nodePlan.B + switch planResult.GetState() { + case commonpb.CompactionState_Completed: + log.Info("start to complete compaction") + if err := c.completeCompaction(planResult); err != nil { + log.Warn("fail to complete compaction", zap.Error(err)) + } + + case commonpb.CompactionState_Executing: + if c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) { + log.Warn("compaction timeout", + zap.Int32("timeout in seconds", task.plan.GetTimeoutInSeconds()), + zap.Uint64("startTime", task.plan.GetStartTime()), + zap.Uint64("now", ts), + ) + c.plans[planID] = c.plans[planID].shadowClone(setState(timeout), endSpan()) } - continue } - // check wether the CompactionPlan is timeout - if state == commonpb.CompactionState_Executing && !c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) { - continue - } - log.Warn("compaction timeout", - zap.Int64("planID", task.plan.PlanID), - zap.Int64("nodeID", task.dataNodeID), - zap.Uint64("startTime", task.plan.GetStartTime()), - zap.Uint64("now", ts), - ) - c.plans[planID] = c.plans[planID].shadowClone(setState(timeout), endSpan()) - continue - } - - log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) - c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan()) - c.setSegmentsCompacting(task.plan, false) - c.scheduler.Finish(task.dataNodeID, task.plan) - } - - // Timeout tasks will be timeout and failed in DataNode - // need to wait for DataNode reporting failure and - // clean the status. - for _, task := range timeoutTasks { - stateResult, ok := planStates[task.plan.PlanID] - planID := task.plan.PlanID - - if !ok { - log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) + } else { + // compaction task in DC but not found in DN means the compaction plan has failed + log.Info("compaction failed") c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan()) c.setSegmentsCompacting(task.plan, false) c.scheduler.Finish(task.dataNodeID, task.plan) } + } - // DataNode will check if plan's are timeout but not as sensitive as DataCoord, - // just wait another round. - if ok && stateResult.GetState() == commonpb.CompactionState_Executing { - log.Info("compaction timeout in DataCoord yet DataNode is still running", - zap.Int64("planID", planID), - zap.Int64("nodeID", task.dataNodeID)) - continue + // Timeout tasks will be timeout and failed in DataNode + // need to wait for DataNode reporting failure and clean the status. + for _, task := range timeoutTasks { + log := log.With(zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) + planID := task.plan.PlanID + cachedPlans = append(cachedPlans, planID) + if nodePlan, ok := planStates[planID]; ok { + if nodePlan.B.GetState() == commonpb.CompactionState_Executing { + log.RatedInfo(1, "compaction timeout in DataCoord yet DataNode is still running") + } + } else { + // compaction task in DC but not found in DN means the compaction plan has failed + log.Info("compaction failed for timeout") + c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan()) + c.setSegmentsCompacting(task.plan, false) + c.scheduler.Finish(task.dataNodeID, task.plan) } } + c.mu.Unlock() + + // Compaction plans in DN but not in DC are unknown plans, need to notify DN to clear it. + // No locks needed, because no changes in DC memeory + completedPlans := lo.PickBy(planStates, func(planID int64, planState *typeutil.Pair[int64, *datapb.CompactionPlanResult]) bool { + return planState.B.GetState() == commonpb.CompactionState_Completed + }) + + unkonwnPlansInWorker, _ := lo.Difference(lo.Keys(completedPlans), cachedPlans) + for _, planID := range unkonwnPlansInWorker { + if nodeUnkonwnPlan, ok := completedPlans[planID]; ok { + nodeID, plan := nodeUnkonwnPlan.A, nodeUnkonwnPlan.B + log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID)) + + // Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task + // without changing the meta + req := &datapb.SyncSegmentsRequest{ + PlanID: planID, + ChannelName: plan.GetChannel(), + } + + log.Info("compaction syncing unknown plan with node") + if err := c.sessions.SyncSegments(nodeID, req); err != nil { + log.Warn("compaction failed to sync segments with node", zap.Error(err)) + return err + } + } + } + return nil } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 5977d84f40..88db0f2173 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -76,12 +76,14 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() { } func (s *CompactionPlanHandlerSuite) TestCheckResult() { - s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{ - 1: {PlanID: 1, State: commonpb.CompactionState_Executing}, - 2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}, - 3: {PlanID: 3, State: commonpb.CompactionState_Executing}, - 4: {PlanID: 4, State: commonpb.CompactionState_Executing}, + s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]{ + 1: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 1, State: commonpb.CompactionState_Executing}}, + 2: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}}, + 3: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 3, State: commonpb.CompactionState_Executing}}, + 4: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 4, State: commonpb.CompactionState_Executing}}, }) + + s.mockSessMgr.EXPECT().SyncSegments(int64(100), mock.Anything).Return(nil).Once() { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once() handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc) @@ -510,10 +512,11 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { } func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { - s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{ - 1: {PlanID: 1, State: commonpb.CompactionState_Executing}, - 2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}, - 3: {PlanID: 3, State: commonpb.CompactionState_Executing}, + s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]{ + 1: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 1, State: commonpb.CompactionState_Executing}}, + 2: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}}, + 3: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 3, State: commonpb.CompactionState_Executing}}, + 5: {A: 222, B: &datapb.CompactionPlanResult{PlanID: 5, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 5}}}}, }) inPlans := map[int64]*compactionTask{ @@ -521,27 +524,42 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { triggerInfo: &compactionSignal{}, plan: &datapb.CompactionPlan{PlanID: 1}, state: executing, + dataNodeID: 111, }, 2: { triggerInfo: &compactionSignal{}, plan: &datapb.CompactionPlan{PlanID: 2}, state: executing, + dataNodeID: 111, }, 3: { triggerInfo: &compactionSignal{}, plan: &datapb.CompactionPlan{PlanID: 3}, state: timeout, + dataNodeID: 111, }, 4: { triggerInfo: &compactionSignal{}, plan: &datapb.CompactionPlan{PlanID: 4}, state: timeout, + dataNodeID: 111, }, } + s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error { + s.EqualValues(nodeID, 222) + s.NotNil(req) + s.Empty(req.GetCompactedFrom()) + s.EqualValues(5, req.GetPlanID()) + return nil + }).Once() + handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans = inPlans + _, ok := handler.plans[5] + s.Require().False(ok) + err := handler.updateCompaction(0) s.NoError(err) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index ff7bb4e413..378e5fa76d 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -465,13 +465,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { continue } - segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs)) - for _, seg := range plan.SegmentBinlogs { - segIDMap[seg.SegmentID] = seg.Deltalogs - } - log.Info("time cost of generating global compaction", - zap.Any("segID2DeltaLogs", segIDMap), zap.Int64("planID", plan.PlanID), zap.Int64("time cost", time.Since(start).Milliseconds()), zap.Int64("collectionID", signal.collectionID), diff --git a/internal/datacoord/mock_session_manager.go b/internal/datacoord/mock_session_manager.go index 0dd3c8029c..ca3ea70112 100644 --- a/internal/datacoord/mock_session_manager.go +++ b/internal/datacoord/mock_session_manager.go @@ -7,6 +7,8 @@ import ( datapb "github.com/milvus-io/milvus/internal/proto/datapb" mock "github.com/stretchr/testify/mock" + + typeutil "github.com/milvus-io/milvus/pkg/util/typeutil" ) // MockSessionManager is an autogenerated mock type for the SessionManager type @@ -398,15 +400,15 @@ func (_c *MockSessionManager_FlushChannels_Call) RunAndReturn(run func(context.C } // GetCompactionPlansResults provides a mock function with given fields: -func (_m *MockSessionManager) GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult { +func (_m *MockSessionManager) GetCompactionPlansResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult] { ret := _m.Called() - var r0 map[int64]*datapb.CompactionPlanResult - if rf, ok := ret.Get(0).(func() map[int64]*datapb.CompactionPlanResult); ok { + var r0 map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult] + if rf, ok := ret.Get(0).(func() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[int64]*datapb.CompactionPlanResult) + r0 = ret.Get(0).(map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]) } } @@ -430,12 +432,12 @@ func (_c *MockSessionManager_GetCompactionPlansResults_Call) Run(run func()) *Mo return _c } -func (_c *MockSessionManager_GetCompactionPlansResults_Call) Return(_a0 map[int64]*datapb.CompactionPlanResult) *MockSessionManager_GetCompactionPlansResults_Call { +func (_c *MockSessionManager_GetCompactionPlansResults_Call) Return(_a0 map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]) *MockSessionManager_GetCompactionPlansResults_Call { _c.Call.Return(_a0) return _c } -func (_c *MockSessionManager_GetCompactionPlansResults_Call) RunAndReturn(run func() map[int64]*datapb.CompactionPlanResult) *MockSessionManager_GetCompactionPlansResults_Call { +func (_c *MockSessionManager_GetCompactionPlansResults_Call) RunAndReturn(run func() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]) *MockSessionManager_GetCompactionPlansResults_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 94000b20fc..9a3bfb343d 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -58,7 +58,7 @@ type SessionManager interface { Compaction(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest) - GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult + GetCompactionPlansResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult] NotifyChannelOperation(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) AddImportSegment(ctx context.Context, nodeID int64, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) @@ -263,11 +263,12 @@ func (c *SessionManagerImpl) execImport(ctx context.Context, nodeID int64, itr * log.Info("success to import", zap.Int64("node", nodeID), zap.Any("import task", itr)) } -func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult { +// GetCompactionPlanResults returns map[planID]*pair[nodeID, *CompactionPlanResults] +func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult] { wg := sync.WaitGroup{} ctx := context.Background() - plans := typeutil.NewConcurrentMap[int64, *datapb.CompactionPlanResult]() + plans := typeutil.NewConcurrentMap[int64, *typeutil.Pair[int64, *datapb.CompactionPlanResult]]() c.sessions.RLock() for nodeID, s := range c.sessions.data { wg.Add(1) @@ -296,15 +297,16 @@ func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*datapb.Compa // for compatibility issue, before 2.3.4, resp has only logpath // try to parse path and fill logid binlog.CompressCompactionBinlogs(rst.GetSegments()) - plans.Insert(rst.PlanID, rst) + nodeRst := typeutil.NewPair(nodeID, rst) + plans.Insert(rst.PlanID, &nodeRst) } }(nodeID, s) } c.sessions.RUnlock() wg.Wait() - rst := make(map[int64]*datapb.CompactionPlanResult) - plans.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { + rst := make(map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]) + plans.Range(func(planID int64, result *typeutil.Pair[int64, *datapb.CompactionPlanResult]) bool { rst[planID] = result return true }) diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index 694a46e477..7f1d7df215 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -68,6 +68,7 @@ func (c *compactionExecutor) injectDone(planID UniqueID) { c.completed.GetAndRemove(planID) task, loaded := c.completedCompactor.GetAndRemove(planID) if loaded { + log.Info("Compaction task inject done", zap.Int64("planID", planID), zap.String("channel", task.getChannelName())) task.injectDone() } } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index e8f5c36128..aeeed72377 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -321,21 +321,26 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac // SyncSegments called by DataCoord, sync the compacted segments' meta between DC and DN func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegmentsRequest) (*commonpb.Status, error) { - log.Ctx(ctx).Info("DataNode receives SyncSegments", + log := log.Ctx(ctx).With( zap.Int64("planID", req.GetPlanID()), + zap.Int64("nodeID", node.GetNodeID()), zap.Int64("target segmentID", req.GetCompactedTo()), zap.Int64s("compacted from", req.GetCompactedFrom()), zap.Int64("numOfRows", req.GetNumOfRows()), zap.String("channelName", req.GetChannelName()), ) + log.Info("DataNode receives SyncSegments") + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { - log.Warn("DataNode.SyncSegments failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err)) + log.Warn("DataNode.SyncSegments failed", zap.Error(err)) return merr.Status(err), nil } if len(req.GetCompactedFrom()) <= 0 { - return merr.Status(merr.WrapErrParameterInvalid(">0", "0", "compacted from segments shouldn't be empty")), nil + log.Info("SyncSegments with empty compactedFrom, clearing the plan") + node.compactionExecutor.injectDone(req.GetPlanID()) + return merr.Success(), nil } ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName()) diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 554ff8f99f..59854fb10c 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -802,7 +802,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 201, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 300, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) - s.Run("invalid compacted from", func() { + s.Run("empty compactedFrom", func() { req := &datapb.SyncSegmentsRequest{ CompactedTo: 400, NumOfRows: 100, @@ -811,10 +811,18 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { req.CompactedFrom = []UniqueID{} status, err := s.node.SyncSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().False(merr.Ok(status)) + s.Assert().True(merr.Ok(status)) + }) + + s.Run("invalid compacted from", func() { + req := &datapb.SyncSegmentsRequest{ + CompactedTo: 400, + NumOfRows: 100, + CompactedFrom: []UniqueID{101, 201}, + } req.CompactedFrom = []UniqueID{101, 201} - status, err = s.node.SyncSegments(s.ctx, req) + status, err := s.node.SyncSegments(s.ctx, req) s.Assert().NoError(err) s.Assert().False(merr.Ok(status)) }) diff --git a/tests/integration/datanode/compaction_test.go b/tests/integration/datanode/compaction_test.go new file mode 100644 index 0000000000..83c5e94410 --- /dev/null +++ b/tests/integration/datanode/compaction_test.go @@ -0,0 +1,209 @@ +package datanode + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + grpcdatacoord "github.com/milvus-io/milvus/internal/distributed/datacoord" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/tests/integration" +) + +// issue: https://github.com/milvus-io/milvus/issues/30137 +func (s *DataNodeSuite) TestClearCompactionTask() { + s.dim = 128 + collName := "test_yx" + // generate 1 segment + pks := s.generateSegment(collName, 1) + + // triggers a compaction + // restart a datacoord + s.compactAndReboot(collName) + + // delete data + // flush -> won't timeout + s.deleteAndFlush(pks, collName) +} + +func (s *DataNodeSuite) deleteAndFlush(pks []int64, collection string) { + ctx := context.Background() + + expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ",")) + log.Info("========================delete expr==================", + zap.String("expr", expr), + ) + deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + CollectionName: collection, + Expr: expr, + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(deleteResp.GetStatus())) + s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt()) + + log.Info("=========================Data flush=========================") + + flushResp, err := s.Cluster.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{ + CollectionNames: []string{collection}, + }) + s.NoError(err) + segmentLongArr, has := flushResp.GetCollSegIDs()[collection] + s.Require().True(has) + segmentIDs := segmentLongArr.GetData() + s.Require().Empty(segmentLongArr) + s.Require().True(has) + + flushTs, has := flushResp.GetCollFlushTs()[collection] + s.True(has) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + log.Info("=========================Wait for flush for 2min=========================") + s.WaitForFlush(ctx, segmentIDs, flushTs, "", collection) + log.Info("=========================Data flush done=========================") +} + +func (s *DataNodeSuite) compactAndReboot(collection string) { + ctx := context.Background() + // create index + createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collection, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(s.dim, "FLAT", metric.IP), + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(createIndexStatus)) + + coll, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + CollectionName: collection, + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(coll.GetStatus())) + s.Require().EqualValues(coll.GetCollectionName(), collection) + + collID := coll.GetCollectionID() + compactionResp, err := s.Cluster.Proxy.ManualCompaction(context.TODO(), &milvuspb.ManualCompactionRequest{ + CollectionID: collID, + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(coll.GetStatus())) + s.NotEqualValues(-1, compactionResp.GetCompactionID()) + s.EqualValues(1, compactionResp.GetCompactionPlanCount()) + + compactID := compactionResp.GetCompactionID() + stateResp, err := s.Cluster.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{ + CompactionID: compactID, + }) + + s.Require().NoError(err) + s.Require().True(merr.Ok(stateResp.GetStatus())) + + // sleep to ensure compaction tasks are submitted to DN + time.Sleep(time.Second) + + planResp, err := s.Cluster.Proxy.GetCompactionStateWithPlans(ctx, &milvuspb.GetCompactionPlansRequest{ + CompactionID: compactID, + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(planResp.GetStatus())) + s.Require().Equal(1, len(planResp.GetMergeInfos())) + + // Reboot + if planResp.GetMergeInfos()[0].GetTarget() == int64(-1) { + s.Cluster.DataCoord.Stop() + s.Cluster.DataCoord = grpcdatacoord.NewServer(ctx, s.Cluster.GetFactory()) + err = s.Cluster.DataCoord.Run() + s.Require().NoError(err) + + stateResp, err = s.Cluster.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{ + CompactionID: compactID, + }) + + s.Require().NoError(err) + s.Require().True(merr.Ok(stateResp.GetStatus())) + s.Require().EqualValues(0, stateResp.GetTimeoutPlanNo()) + s.Require().EqualValues(0, stateResp.GetExecutingPlanNo()) + s.Require().EqualValues(0, stateResp.GetCompletedPlanNo()) + s.Require().EqualValues(0, stateResp.GetFailedPlanNo()) + } +} + +func (s *DataNodeSuite) generateSegment(collection string, segmentCount int) []int64 { + c := s.Cluster + + schema := integration.ConstructSchema(collection, s.dim, true) + marshaledSchema, err := proto.Marshal(schema) + s.Require().NoError(err) + + createCollectionStatus, err := c.Proxy.CreateCollection(context.TODO(), &milvuspb.CreateCollectionRequest{ + CollectionName: collection, + Schema: marshaledSchema, + ShardsNum: 1, + }) + s.Require().NoError(err) + err = merr.Error(createCollectionStatus) + s.Require().NoError(err) + + rowNum := 3000 + pks := []int64{} + for i := 0; i < segmentCount; i++ { + log.Info("=========================Data insertion=========================", zap.Any("count", i)) + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, s.dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(context.TODO(), &milvuspb.InsertRequest{ + CollectionName: collection, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + s.NoError(err) + s.True(merr.Ok(insertResult.GetStatus())) + s.Require().EqualValues(rowNum, insertResult.GetInsertCnt()) + s.Require().EqualValues(rowNum, len(insertResult.GetIDs().GetIntId().GetData())) + + pks = append(pks, insertResult.GetIDs().GetIntId().GetData()...) + + log.Info("=========================Data flush=========================", zap.Any("count", i)) + flushResp, err := c.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{ + CollectionNames: []string{collection}, + }) + s.NoError(err) + segmentLongArr, has := flushResp.GetCollSegIDs()[collection] + s.Require().True(has) + segmentIDs := segmentLongArr.GetData() + s.Require().NotEmpty(segmentLongArr) + s.Require().True(has) + + flushTs, has := flushResp.GetCollFlushTs()[collection] + s.True(has) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + s.WaitForFlush(ctx, segmentIDs, flushTs, "", collection) + log.Info("=========================Data flush done=========================", zap.Any("count", i)) + } + log.Info("=========================Data insertion finished=========================") + + segments, err := c.MetaWatcher.ShowSegments() + s.Require().NoError(err) + s.Require().Equal(segmentCount, len(segments)) + lo.ForEach(segments, func(info *datapb.SegmentInfo, _ int) { + s.Require().Equal(commonpb.SegmentState_Flushed, info.GetState()) + s.Require().EqualValues(3000, info.GetNumOfRows()) + }) + + return pks[:300] +}