diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 95f8481386..2399209120 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -235,6 +235,7 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe // Compaction handles compaction request from DataCoord // returns status as long as compaction task enqueued or invalid func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { + log := log.Ctx(ctx).With(zap.Int64("planID", req.GetPlanID())) if err := merr.CheckHealthy(node.GetStateCode()); err != nil { log.Warn("DataNode.Compaction failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err)) return merr.Status(err), nil @@ -251,6 +252,20 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil } + meta := ds.metacache + for _, segment := range req.GetSegmentBinlogs() { + if segment.GetLevel() == datapb.SegmentLevel_L0 { + continue + } + _, ok := meta.GetSegmentByID(segment.GetSegmentID(), metacache.WithSegmentState(commonpb.SegmentState_Flushed)) + if !ok { + log.Warn("compaction plan contains segment which is not flushed", + zap.Int64("segmentID", segment.GetSegmentID()), + ) + return merr.Status(merr.WrapErrSegmentNotFound(segment.GetSegmentID(), "segment with flushed state not found")), nil + } + } + var task compactor switch req.GetType() { case datapb.CompactionType_Level0DeleteCompaction: diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 58bfad043d..e7e5ba6863 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -212,6 +212,120 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() { }) } +func (s *DataNodeServicesSuite) TestCompaction() { + dmChannelName := "by-dev-rootcoord-dml_0_100v0" + schema := &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + {FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64}, + {FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64}, + {FieldID: common.StartOfUserFieldID, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"}, + {FieldID: common.StartOfUserFieldID + 1, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }}, + }, + } + flushedSegmentID := int64(100) + growingSegmentID := int64(101) + + vchan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: dmChannelName, + UnflushedSegmentIds: []int64{}, + FlushedSegmentIds: []int64{}, + } + + err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, vchan, schema, genTestTickler()) + s.Require().NoError(err) + + fgservice, ok := s.node.flowgraphManager.GetFlowgraphService(dmChannelName) + s.Require().True(ok) + + metaCache := metacache.NewMockMetaCache(s.T()) + metaCache.EXPECT().Collection().Return(1).Maybe() + metaCache.EXPECT().Schema().Return(schema).Maybe() + s.node.writeBufferManager.Register(dmChannelName, metaCache, nil) + + fgservice.metacache.AddSegment(&datapb.SegmentInfo{ + ID: flushedSegmentID, + CollectionID: 1, + PartitionID: 2, + StartPosition: &msgpb.MsgPosition{}, + }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }) + fgservice.metacache.AddSegment(&datapb.SegmentInfo{ + ID: growingSegmentID, + CollectionID: 1, + PartitionID: 2, + StartPosition: &msgpb.MsgPosition{}, + }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }) + s.Run("service_not_ready", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + node := &DataNode{} + node.UpdateStateCode(commonpb.StateCode_Abnormal) + req := &datapb.CompactionPlan{ + PlanID: 1000, + Channel: dmChannelName, + } + + resp, err := node.Compaction(ctx, req) + s.NoError(err) + s.False(merr.Ok(resp)) + }) + + s.Run("channel_not_match", func() { + node := s.node + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &datapb.CompactionPlan{ + PlanID: 1000, + Channel: dmChannelName + "other", + } + + resp, err := node.Compaction(ctx, req) + s.NoError(err) + s.False(merr.Ok(resp)) + }) + + s.Run("channel_dropped", func() { + node := s.node + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + node.compactionExecutor.dropped.Insert(dmChannelName) + defer node.compactionExecutor.dropped.Remove(dmChannelName) + + req := &datapb.CompactionPlan{ + PlanID: 1000, + Channel: dmChannelName, + } + + resp, err := node.Compaction(ctx, req) + s.NoError(err) + s.False(merr.Ok(resp)) + }) + + s.Run("compact_growing_segment", func() { + node := s.node + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &datapb.CompactionPlan{ + PlanID: 1000, + Channel: dmChannelName, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 102, Level: datapb.SegmentLevel_L0}, + {SegmentID: growingSegmentID, Level: datapb.SegmentLevel_L1}, + }, + } + + resp, err := node.Compaction(ctx, req) + s.NoError(err) + s.False(merr.Ok(resp)) + }) +} + func (s *DataNodeServicesSuite) TestFlushSegments() { dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments" schema := &schemapb.CollectionSchema{