diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 9d64a084bb..d0c0f7960f 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -355,7 +355,7 @@ func (c *importChecker) checkSortingJob(job ImportJob) { sortSegmentIDs := task.(*importTask).GetSortedSegmentIDs() taskCnt += len(originSegmentIDs) for i, originSegmentID := range originSegmentIDs { - taskLogFields := WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("target", sortSegmentIDs[i])) + logger := log.With(WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("target", sortSegmentIDs[i]))...) originSegment := c.meta.GetHealthySegment(c.ctx, originSegmentID) targetSegment := c.meta.GetHealthySegment(c.ctx, sortSegmentIDs[i]) if originSegment == nil { @@ -363,26 +363,31 @@ func (c *importChecker) checkSortingJob(job ImportJob) { doneCnt++ continue } - if targetSegment == nil { - compactionTask, err := createSortCompactionTask(c.ctx, originSegment, sortSegmentIDs[i], c.meta, c.handler, c.alloc) + if targetSegment != nil { + // sort compaction is already done + doneCnt++ + continue + } + // if not compacting, trigger sort compaction task + isCompacting := c.meta.IsSegmentCompacting(originSegmentID) + if !isCompacting { + compactionTask, err := createSortCompactionTask(c.ctx, task, originSegment, sortSegmentIDs[i], c.meta, c.handler, c.alloc) if err != nil { - log.Warn("create sort compaction task failed", zap.Int64("segmentID", originSegmentID), zap.Error(err)) + logger.Warn("create sort compaction task failed", zap.Error(err)) continue } if compactionTask == nil { - log.Info("maybe it no need to create sort compaction task", zap.Int64("segmentID", originSegmentID)) + logger.Info("maybe it no need to create sort compaction task") doneCnt++ continue } - log.Info("create sort compaction task success", taskLogFields...) err = c.ci.enqueueCompaction(compactionTask) if err != nil { - log.Warn("sort compaction task enqueue failed", zap.Error(err)) + logger.Warn("sort compaction task enqueue failed", zap.Error(err)) continue } - continue + logger.Info("create sort compaction task and enqueue success") } - doneCnt++ } } diff --git a/internal/datacoord/import_task_import.go b/internal/datacoord/import_task_import.go index 4a0d523a81..728ebac6cc 100644 --- a/internal/datacoord/import_task_import.go +++ b/internal/datacoord/import_task_import.go @@ -205,6 +205,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) { } } if resp.GetState() == datapb.ImportTaskStateV2_Completed { + totalRows := int64(0) for _, info := range resp.GetImportSegmentsInfo() { // try to parse path and fill logID err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs(), info.GetBm25Logs()) @@ -226,6 +227,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) { return } log.Info("update import segment info done", WrapTaskLog(t, zap.Int64("segmentID", info.GetSegmentID()), zap.Any("segmentInfo", info))...) + totalRows += info.GetImportedRows() } completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") err = t.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime)) @@ -235,7 +237,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) { } importDuration := t.GetTR().RecordSpan() metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds())) - log.Info("import done", WrapTaskLog(t, zap.Duration("taskTimeCost/import", importDuration))...) + log.Info("import done", WrapTaskLog(t, zap.Int64("totalRows", totalRows), zap.Duration("taskTimeCost/import", importDuration))...) } log.Info("query import", WrapTaskLog(t, zap.String("respState", resp.GetState().String()), zap.String("reason", resp.GetReason()))...) diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 199c310966..2feab357b8 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -847,17 +847,19 @@ func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) int { } func createSortCompactionTask(ctx context.Context, + t ImportTask, originSegment *SegmentInfo, targetSegmentID int64, meta *meta, handler Handler, alloc allocator.Allocator, ) (*datapb.CompactionTask, error) { + log := log.Ctx(ctx).With(WrapTaskLog(t)...) if originSegment.GetNumOfRows() == 0 { operator := UpdateStatusOperator(originSegment.GetID(), commonpb.SegmentState_Dropped) err := meta.UpdateSegmentsInfo(ctx, operator) if err != nil { - log.Ctx(ctx).Warn("import zero num row segment, but mark it dropped failed", zap.Error(err)) + log.Warn("import zero num row segment, but mark it dropped failed", zap.Error(err)) return nil, err } return nil, nil @@ -870,13 +872,13 @@ func createSortCompactionTask(ctx context.Context, collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties) if err != nil { - log.Warn("failed to apply triggerSegmentSortCompaction, get collection ttl failed") + log.Warn("Failed to create sort compaction task because get collection ttl failed") return nil, err } startID, _, err := alloc.AllocN(2) if err != nil { - log.Warn("fFailed to submit compaction view to scheduler because allocate id fail", zap.Error(err)) + log.Warn("Failed to create sort compaction task because allocate id fail", zap.Error(err)) return nil, err } @@ -903,7 +905,7 @@ func createSortCompactionTask(ctx context.Context, }, } - log.Ctx(ctx).Info("create sort compaction task success", zap.Int64("segmentID", originSegment.GetID()), + log.Info("create sort compaction task success", zap.Int64("segmentID", originSegment.GetID()), zap.Int64("targetSegmentID", targetSegmentID), zap.Int64("num rows", originSegment.GetNumOfRows())) return task, nil } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 7a01fcc777..470281308a 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1624,6 +1624,17 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { m.segments.SetIsCompacting(segmentID, compacting) } +// IsSegmentCompacting check if segment is compacting +func (m *meta) IsSegmentCompacting(segmentID UniqueID) bool { + m.segMu.RLock() + defer m.segMu.RUnlock() + seg := m.segments.GetSegment(segmentID) + if seg == nil { + return false + } + return seg.isCompacting +} + // CheckAndSetSegmentsCompacting check all segments are not compacting // if true, set them compacting and return true // if false, skip setting and diff --git a/internal/datanode/compactor/sort_compaction.go b/internal/datanode/compactor/sort_compaction.go index bcaf6ff718..32d46b2dd3 100644 --- a/internal/datanode/compactor/sort_compaction.go +++ b/internal/datanode/compactor/sort_compaction.go @@ -45,6 +45,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" + "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -286,6 +287,17 @@ func (t *sortCompactionTask) sortSegment(ctx context.Context) (*datapb.Compactio } debug.FreeOSMemory() + + if numValidRows != int(numRows)-entityFilter.GetDeletedCount()-entityFilter.GetExpiredCount() { + log.Warn("unexpected row count after sort compaction", + zap.Int64("target segmentID", targetSegmentID), + zap.Int64("old rows", numRows), + zap.Int("valid rows", numValidRows), + zap.Int("deleted rows", entityFilter.GetDeletedCount()), + zap.Int("expired rows", entityFilter.GetExpiredCount())) + return nil, merr.WrapErrServiceInternal("unexpected row count") + } + log.Info("sort segment end", zap.Int64("target segmentID", targetSegmentID), zap.Int64("old rows", numRows), diff --git a/internal/datanode/compactor/sort_compaction_test.go b/internal/datanode/compactor/sort_compaction_test.go index 51c1b463c9..5c2e92479e 100644 --- a/internal/datanode/compactor/sort_compaction_test.go +++ b/internal/datanode/compactor/sort_compaction_test.go @@ -248,7 +248,7 @@ func (s *SortCompactionTaskSuite) setupBM25Test() { PreAllocatedLogIDs: &datapb.IDRange{Begin: 9530, End: 19530}, MaxSize: 64 * 1024 * 1024, JsonParams: params, - TotalRows: 3, + TotalRows: 1, } pk, err := typeutil.GetPrimaryFieldSchema(plan.GetSchema()) @@ -285,7 +285,7 @@ func (s *SortCompactionTaskSuite) prepareSortCompactionWithBM25Task() { func (s *SortCompactionTaskSuite) TestSortCompactionWithExpiredData() { segmentID := int64(1001) - s.initSegBuffer(1, segmentID) + s.initSegBuffer(1000, segmentID) collTTL := 864000 // 10 days s.task.currentTime = getMilvusBirthday().Add(time.Second * (time.Duration(collTTL) + 1)) s.task.plan.CollectionTtl = int64(collTTL)