diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 36dfd8bd2a..d3f07507c9 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -450,6 +450,8 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { totalDuration := job.GetTR().ElapseSpan() metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds())) <-c.l0CompactionTrigger.GetResumeCompactionChan(job.GetJobID(), job.GetCollectionID()) + + LogResultSegmentsInfo(job.GetJobID(), c.meta, targetSegmentIDs) log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration)) } diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index acaa98bf8c..1f7feab256 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -683,6 +683,51 @@ func ListBinlogsAndGroupBySegment(ctx context.Context, return segmentImportFiles, nil } +func LogResultSegmentsInfo(jobID int64, meta *meta, segmentIDs []int64) { + type ( + segments = []*SegmentInfo + segmentInfo struct { + ID int64 + Rows int64 + Size int64 + } + ) + segmentsByChannelAndPartition := make(map[string]map[int64]segments) // channel => [partition => segments] + for _, segmentInfo := range meta.GetSegmentInfos(segmentIDs) { + channel := segmentInfo.GetInsertChannel() + partition := segmentInfo.GetPartitionID() + if _, ok := segmentsByChannelAndPartition[channel]; !ok { + segmentsByChannelAndPartition[channel] = make(map[int64]segments) + } + segmentsByChannelAndPartition[channel][partition] = append(segmentsByChannelAndPartition[channel][partition], segmentInfo) + } + var ( + totalRows int64 + totalSize int64 + ) + for channel, partitionSegments := range segmentsByChannelAndPartition { + for partitionID, segments := range partitionSegments { + infos := lo.Map(segments, func(segment *SegmentInfo, _ int) *segmentInfo { + rows := segment.GetNumOfRows() + size := segment.getSegmentSize() + totalRows += rows + totalSize += size + return &segmentInfo{ + ID: segment.GetID(), + Rows: rows, + Size: size, + } + }) + log.Info("import segments info", zap.Int64("jobID", jobID), + zap.String("channel", channel), zap.Int64("partitionID", partitionID), + zap.Int("segmentsNum", len(segments)), zap.Any("segmentsInfo", infos), + ) + } + } + log.Info("import result info", zap.Int64("jobID", jobID), + zap.Int64("totalRows", totalRows), zap.Int64("totalSize", totalSize)) +} + // ValidateBinlogImportRequest validates the binlog import request. func ValidateBinlogImportRequest(ctx context.Context, cm storage.ChunkManager, reqFiles []*msgpb.ImportFile, options []*commonpb.KeyValuePair, diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 5f755cb5b5..cd66786a6d 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -946,6 +946,60 @@ func TestImportTask_MarshalJSON(t *testing.T) { assert.Equal(t, task.GetCompleteTime(), importTask.CompleteTime) } +func TestLogResultSegmentsInfo(t *testing.T) { + // Create mock catalog and broker + mockCatalog := mocks.NewDataCoordCatalog(t) + meta := &meta{ + segments: NewSegmentsInfo(), + catalog: mockCatalog, + } + + // Create test segments + segments := []*SegmentInfo{ + { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + PartitionID: 1, + InsertChannel: "ch1", + NumOfRows: 100, + State: commonpb.SegmentState_Flushed, + }, + }, + { + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 1, + PartitionID: 1, + InsertChannel: "ch1", + NumOfRows: 200, + State: commonpb.SegmentState_Flushed, + }, + }, + { + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "ch2", + NumOfRows: 300, + State: commonpb.SegmentState_Flushed, + }, + }, + } + + // Add segments to meta + for _, segment := range segments { + meta.segments.SetSegment(segment.ID, segment) + } + + jobID := int64(2) + segmentIDs := []int64{1, 2, 3} + + // Call the function + LogResultSegmentsInfo(jobID, meta, segmentIDs) +} + // TestImportUtil_ValidateBinlogImportRequest tests the validation of binlog import request func TestImportUtil_ValidateBinlogImportRequest(t *testing.T) { ctx := context.Background()