diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 3597a2a632..9c79148907 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -324,7 +324,7 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) { }) plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...) - log.Info("Compaction handler refreshed level zero compaction plan", zap.Any("target segments", sealedSegBinlogs)) + log.Info("Compaction handler refreshed level zero compaction plan", zap.Any("target segments count", len(sealedSegBinlogs))) return } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 09ec15aab4..0f7d7f6a03 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -109,6 +109,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F for _, segment := range segments { if segment != nil && (isFlushState(segment.GetState())) && + segment.GetLevel() == datapb.SegmentLevel_L1 && !sealedSegmentsIDDict[segment.GetID()] { flushSegmentIDs = append(flushSegmentIDs, segment.GetID()) } @@ -146,7 +147,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F log.Info("flush response with segments", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64s("sealSegments", sealedSegmentIDs), - zap.Int64s("flushSegments", flushSegmentIDs), + zap.Int("flushedSegmentsCount", len(flushSegmentIDs)), zap.Time("timeOfSeal", timeOfSeal), zap.Time("flushTs", tsoutil.PhysicalTime(ts))) diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index 3e94344d5b..694a46e477 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -89,18 +89,22 @@ func (c *compactionExecutor) start(ctx context.Context) { } func (c *compactionExecutor) executeTask(task compactor) { + log := log.With( + zap.Int64("planID", task.getPlanID()), + zap.Int64("Collection", task.getCollection()), + zap.String("channel", task.getChannelName()), + ) + defer func() { c.toCompleteState(task) }() - log.Info("start to execute compaction", zap.Int64("planID", task.getPlanID()), zap.Int64("Collection", task.getCollection()), zap.String("channel", task.getChannelName())) + log.Info("start to execute compaction") result, err := task.compact() if err != nil { - log.Warn("compaction task failed", - zap.Int64("planID", task.getPlanID()), - zap.Error(err), - ) + task.injectDone() + log.Warn("compaction task failed", zap.Error(err)) } else { c.completed.Insert(result.GetPlanID(), result) c.completedCompactor.Insert(result.GetPlanID(), task) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 263841273b..043405ca4b 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "go.opentelemetry.io/otel" "go.uber.org/zap" @@ -45,11 +46,10 @@ import ( ) var ( - errCompactionTypeUndifined = errors.New("compaction type undefined") - errIllegalCompactionPlan = errors.New("compaction plan illegal") - errTransferType = errors.New("transfer intferface to type wrong") - errUnknownDataType = errors.New("unknown shema DataType") - errContext = errors.New("context done or timeout") + errIllegalCompactionPlan = errors.New("compaction plan illegal") + errTransferType = errors.New("transfer intferface to type wrong") + errUnknownDataType = errors.New("unknown shema DataType") + errContext = errors.New("context done or timeout") ) type iterator = storage.Iterator @@ -67,6 +67,7 @@ type compactor interface { // make sure compactionTask implements compactor interface var _ compactor = (*compactionTask)(nil) +// for MixCompaction only type compactionTask struct { downloader uploader @@ -432,71 +433,49 @@ func (t *compactionTask) merge( func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("Compact-%d", t.getPlanID())) defer span.End() - log := log.Ctx(ctx).With(zap.Int64("planID", t.plan.GetPlanID())) - compactStart := time.Now() + + log := log.Ctx(ctx).With(zap.Int64("planID", t.plan.GetPlanID()), zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds())) if ok := funcutil.CheckCtxValid(ctx); !ok { log.Warn("compact wrong, task context done or timeout") return nil, errContext } - durInQueue := t.tr.RecordSpan() ctxTimeout, cancelAll := context.WithTimeout(ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second) defer cancelAll() - var targetSegID UniqueID - var err error - switch { - case t.plan.GetType() == datapb.CompactionType_UndefinedCompaction: - log.Warn("compact wrong, compaction type undefined") - return nil, errCompactionTypeUndifined - - case len(t.plan.GetSegmentBinlogs()) < 1: + compactStart := time.Now() + durInQueue := t.tr.RecordSpan() + log.Info("compact start") + if len(t.plan.GetSegmentBinlogs()) < 1 { log.Warn("compact wrong, there's no segments in segment binlogs") return nil, errIllegalCompactionPlan - - case t.plan.GetType() == datapb.CompactionType_MergeCompaction || t.plan.GetType() == datapb.CompactionType_MixCompaction: - targetSegID, err = t.AllocOne() - if err != nil { - log.Warn("compact wrong", zap.Error(err)) - return nil, err - } } - log.Info("compact start", zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds())) - err = binlog.DecompressCompactionBinlogs(t.plan.GetSegmentBinlogs()) + targetSegID, err := t.AllocOne() if err != nil { - log.Warn("DecompressCompactionBinlogs fails", zap.Error(err)) + log.Warn("compact wrong, unable to allocate segmentID", zap.Error(err)) return nil, err } - segIDs := make([]UniqueID, 0, len(t.plan.GetSegmentBinlogs())) - for _, s := range t.plan.GetSegmentBinlogs() { - segIDs = append(segIDs, s.GetSegmentID()) - } - _, partID, meta, err := t.getSegmentMeta(segIDs[0]) - if err != nil { - log.Warn("compact wrong", zap.Error(err)) - return nil, err - } + segIDs := lo.Map(t.plan.GetSegmentBinlogs(), func(binlogs *datapb.CompactionSegmentBinlogs, _ int) int64 { + return binlogs.GetSegmentID() + }) // Inject to stop flush - injectStart := time.Now() + // when compaction failed, these segments need to be Unblocked by injectDone in compaction_executor + // when compaction succeeded, these segments will be Unblocked by SyncSegments from DataCoord. for _, segID := range segIDs { t.syncMgr.Block(segID) } - log.Info("compact inject elapse", zap.Duration("elapse", time.Since(injectStart))) - defer func() { - if err != nil { - for _, segID := range segIDs { - t.syncMgr.Unblock(segID) - } - } - }() + log.Info("compact finsh injection", zap.Duration("elapse", t.tr.RecordSpan())) + + if err := binlog.DecompressCompactionBinlogs(t.plan.GetSegmentBinlogs()); err != nil { + log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err)) + return nil, err + } dblobs := make(map[UniqueID][]*Blob) allPath := make([][]string, 0) - - downloadStart := time.Now() for _, s := range t.plan.GetSegmentBinlogs() { // Get the number of field binlog files from non-empty segment var binlogNum int @@ -532,28 +511,27 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { if len(paths) != 0 { bs, err := t.download(ctxTimeout, paths) if err != nil { - log.Warn("compact download deltalogs wrong", zap.Int64("segment", segID), zap.Strings("path", paths), zap.Error(err)) + log.Warn("compact wrong, fail to download deltalogs", zap.Int64("segment", segID), zap.Strings("path", paths), zap.Error(err)) return nil, err } dblobs[segID] = append(dblobs[segID], bs...) } } - - log.Info("compact download deltalogs elapse", zap.Duration("elapse", time.Since(downloadStart))) - - if err != nil { - log.Warn("compact IO wrong", zap.Error(err)) - return nil, err - } + log.Info("compact download deltalogs done", zap.Duration("elapse", t.tr.RecordSpan())) deltaPk2Ts, err := t.mergeDeltalogs(dblobs) if err != nil { + log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err)) return nil, err } + segmentBinlog := t.plan.GetSegmentBinlogs()[0] + partID := segmentBinlog.GetPartitionID() + meta := &etcdpb.CollectionMeta{ID: t.metaCache.Collection(), Schema: t.metaCache.Schema()} + inPaths, statsPaths, numRows, err := t.merge(ctxTimeout, allPath, targetSegID, partID, meta, deltaPk2Ts) if err != nil { - log.Warn("compact wrong", zap.Error(err)) + log.Warn("compact wrong, fail to merge", zap.Error(err)) return nil, err } @@ -571,15 +549,16 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { zap.Int("num of binlog paths", len(inPaths)), zap.Int("num of stats paths", len(statsPaths)), zap.Int("num of delta paths", len(pack.GetDeltalogs())), + zap.Duration("elapse", time.Since(compactStart)), ) - log.Info("compact overall elapse", zap.Duration("elapse", time.Since(compactStart))) metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()).Observe(float64(t.tr.ElapseSpan().Milliseconds())) metrics.DataNodeCompactionLatencyInQueue.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(durInQueue.Milliseconds())) planResult := &datapb.CompactionPlanResult{ State: commonpb.CompactionState_Completed, PlanID: t.getPlanID(), + Channel: t.plan.GetChannel(), Segments: []*datapb.CompactionSegment{pack}, Type: t.plan.GetType(), } @@ -812,22 +791,6 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{} return rst, nil } -func (t *compactionTask) getSegmentMeta(segID UniqueID) (UniqueID, UniqueID, *etcdpb.CollectionMeta, error) { - collID := t.metaCache.Collection() - seg, ok := t.metaCache.GetSegmentByID(segID) - if !ok { - return -1, -1, nil, merr.WrapErrSegmentNotFound(segID) - } - partID := seg.PartitionID() - sch := t.metaCache.Schema() - - meta := &etcdpb.CollectionMeta{ - ID: collID, - Schema: sch, - } - return collID, partID, meta, nil -} - func (t *compactionTask) getCollection() UniqueID { return t.metaCache.Collection() } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index a2682e2be0..b5d084b2f1 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -51,36 +52,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) { defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(compactTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - t.Run("Test getSegmentMeta", func(t *testing.T) { - f := MetaFactory{} - meta := f.GetCollectionMeta(1, "testCollection", schemapb.DataType_Int64) - - metaCache := metacache.NewMockMetaCache(t) - metaCache.EXPECT().GetSegmentByID(mock.Anything).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { - if id == 100 { - return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, CollectionID: 1, PartitionID: 10}, nil), true - } - return nil, false - }) - metaCache.EXPECT().Collection().Return(1) - metaCache.EXPECT().Schema().Return(meta.GetSchema()) - var err error - - task := &compactionTask{ - metaCache: metaCache, - done: make(chan struct{}, 1), - } - - _, _, _, err = task.getSegmentMeta(200) - assert.Error(t, err) - - collID, partID, meta, err := task.getSegmentMeta(100) - assert.NoError(t, err) - assert.Equal(t, UniqueID(1), collID) - assert.Equal(t, UniqueID(10), partID) - assert.NotNil(t, meta) - }) - t.Run("Test.interface2FieldData", func(t *testing.T) { tests := []struct { isvalid bool @@ -795,47 +766,51 @@ func TestCompactorInterfaceMethods(t *testing.T) { }} paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") // Turn off auto expiration - t.Run("Test compact invalid", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - ctx, cancel := context.WithCancel(context.TODO()) - metaCache := metacache.NewMockMetaCache(t) - metaCache.EXPECT().Collection().Return(1) - metaCache.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false) - syncMgr := syncmgr.NewMockSyncManager(t) - syncMgr.EXPECT().Unblock(mock.Anything).Return() - emptyTask := &compactionTask{ - ctx: ctx, - cancel: cancel, - done: make(chan struct{}, 1), - metaCache: metaCache, - syncMgr: syncMgr, - tr: timerecord.NewTimeRecorder("test"), - } - + t.Run("Test compact invalid empty segment binlogs", func(t *testing.T) { plan := &datapb.CompactionPlan{ PlanID: 999, - SegmentBinlogs: notEmptySegmentBinlogs, - StartTime: 0, + SegmentBinlogs: nil, TimeoutInSeconds: 10, - Type: datapb.CompactionType_UndefinedCompaction, - Channel: "", + Type: datapb.CompactionType_MixCompaction, + } + ctx, cancel := context.WithCancel(context.Background()) + emptyTask := &compactionTask{ + ctx: ctx, + cancel: cancel, + tr: timerecord.NewTimeRecorder("test"), + + done: make(chan struct{}, 1), + plan: plan, } - emptyTask.plan = plan _, err := emptyTask.compact() assert.Error(t, err) - - plan.Type = datapb.CompactionType_MergeCompaction - emptyTask.Allocator = alloc - plan.SegmentBinlogs = notEmptySegmentBinlogs - _, err = emptyTask.compact() - assert.Error(t, err) + assert.ErrorIs(t, err, errIllegalCompactionPlan) emptyTask.complete() emptyTask.stop() }) + t.Run("Test compact invalid AllocOnce failed", func(t *testing.T) { + mockAlloc := allocator.NewMockAllocator(t) + mockAlloc.EXPECT().AllocOne().Call.Return(int64(0), errors.New("mock allocone error")).Once() + plan := &datapb.CompactionPlan{ + PlanID: 999, + SegmentBinlogs: notEmptySegmentBinlogs, + TimeoutInSeconds: 10, + Type: datapb.CompactionType_MixCompaction, + } + task := &compactionTask{ + ctx: context.Background(), + tr: timerecord.NewTimeRecorder("test"), + Allocator: mockAlloc, + plan: plan, + } + + _, err := task.compact() + assert.Error(t, err) + }) + t.Run("Test typeII compact valid", func(t *testing.T) { alloc := allocator.NewMockAllocator(t) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index 33134aa543..26fbcd197c 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -190,7 +190,7 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error ) for segID, deltaLogs := range totalDeltalogs { log := log.With(zap.Int64("levelzero segment", segID)) - log.Info("Linear L0 compaction processing segment", zap.Int64s("target segmentIDs", targetSegIDs)) + log.Info("Linear L0 compaction processing segment", zap.Int("target segment count", len(targetSegIDs))) allIters, err := t.loadDelta(ctxTimeout, deltaLogs) if err != nil { @@ -339,10 +339,12 @@ func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireChec if !requireCheck || (dData.Size() >= paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()) { blobs, binlog, err := t.composeDeltalog(segID, dData) if err != nil { + log.Warn("L0 compaction composeDelta fail", zap.Int64("segmentID", segID), zap.Error(err)) return err } err = t.Upload(ctx, blobs) if err != nil { + log.Warn("L0 compaction upload blobs fail", zap.Int64("segmentID", segID), zap.Any("binlog", binlog), zap.Error(err)) return err } diff --git a/internal/datanode/l0_compactor_test.go b/internal/datanode/l0_compactor_test.go index 716db4090a..c7dc9422d7 100644 --- a/internal/datanode/l0_compactor_test.go +++ b/internal/datanode/l0_compactor_test.go @@ -232,6 +232,39 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() { ctx := context.Background() + s.Run("uploadByCheck directly composeDeltalog failed", func() { + s.SetupTest() + s.mockMeta.EXPECT().Collection().Return(1) + s.mockMeta.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false).Once() + + segments := map[int64]*storage.DeleteData{100: s.dData} + results := make(map[int64]*datapb.CompactionSegment) + err := s.task.uploadByCheck(ctx, false, segments, results) + s.Error(err) + s.Equal(0, len(results)) + }) + + s.Run("uploadByCheck directly Upload failed", func() { + s.SetupTest() + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(errors.New("mock upload failed")) + s.mockMeta.EXPECT().Collection().Return(1) + s.mockMeta.EXPECT().GetSegmentByID( + mock.MatchedBy(func(ID int64) bool { + return ID == 100 + }), mock.Anything). + Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true) + + s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) + blobKey := metautil.JoinIDPath(1, 10, 100, 19530) + blobPath := path.Join(common.SegmentDeltaLogPath, blobKey) + s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath) + + segments := map[int64]*storage.DeleteData{100: s.dData} + results := make(map[int64]*datapb.CompactionSegment) + err := s.task.uploadByCheck(ctx, false, segments, results) + s.Error(err) + s.Equal(0, len(results)) + }) s.Run("upload directly", func() { s.SetupTest() diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 5afe0997e4..330b25b601 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -284,7 +284,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan node.syncMgr, req, ) - case datapb.CompactionType_MixCompaction, datapb.CompactionType_MinorCompaction: + case datapb.CompactionType_MixCompaction: // TODO, replace this binlogIO with io.BinlogIO binlogIO := &binlogIO{node.chunkManager, ds.idAllocator} task = newCompactionTask(