enhance: Print segments info after import done (#43200)

issue: https://github.com/milvus-io/milvus/issues/42488

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-07-10 12:38:47 +08:00 committed by GitHub
parent 07745439b5
commit ee9a95189a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 101 additions and 0 deletions

View File

@ -450,6 +450,8 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
totalDuration := job.GetTR().ElapseSpan() totalDuration := job.GetTR().ElapseSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds())) metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds()))
<-c.l0CompactionTrigger.GetResumeCompactionChan(job.GetJobID(), job.GetCollectionID()) <-c.l0CompactionTrigger.GetResumeCompactionChan(job.GetJobID(), job.GetCollectionID())
LogResultSegmentsInfo(job.GetJobID(), c.meta, targetSegmentIDs)
log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration)) log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration))
} }

View File

@ -683,6 +683,51 @@ func ListBinlogsAndGroupBySegment(ctx context.Context,
return segmentImportFiles, nil return segmentImportFiles, nil
} }
func LogResultSegmentsInfo(jobID int64, meta *meta, segmentIDs []int64) {
type (
segments = []*SegmentInfo
segmentInfo struct {
ID int64
Rows int64
Size int64
}
)
segmentsByChannelAndPartition := make(map[string]map[int64]segments) // channel => [partition => segments]
for _, segmentInfo := range meta.GetSegmentInfos(segmentIDs) {
channel := segmentInfo.GetInsertChannel()
partition := segmentInfo.GetPartitionID()
if _, ok := segmentsByChannelAndPartition[channel]; !ok {
segmentsByChannelAndPartition[channel] = make(map[int64]segments)
}
segmentsByChannelAndPartition[channel][partition] = append(segmentsByChannelAndPartition[channel][partition], segmentInfo)
}
var (
totalRows int64
totalSize int64
)
for channel, partitionSegments := range segmentsByChannelAndPartition {
for partitionID, segments := range partitionSegments {
infos := lo.Map(segments, func(segment *SegmentInfo, _ int) *segmentInfo {
rows := segment.GetNumOfRows()
size := segment.getSegmentSize()
totalRows += rows
totalSize += size
return &segmentInfo{
ID: segment.GetID(),
Rows: rows,
Size: size,
}
})
log.Info("import segments info", zap.Int64("jobID", jobID),
zap.String("channel", channel), zap.Int64("partitionID", partitionID),
zap.Int("segmentsNum", len(segments)), zap.Any("segmentsInfo", infos),
)
}
}
log.Info("import result info", zap.Int64("jobID", jobID),
zap.Int64("totalRows", totalRows), zap.Int64("totalSize", totalSize))
}
// ValidateBinlogImportRequest validates the binlog import request. // ValidateBinlogImportRequest validates the binlog import request.
func ValidateBinlogImportRequest(ctx context.Context, cm storage.ChunkManager, func ValidateBinlogImportRequest(ctx context.Context, cm storage.ChunkManager,
reqFiles []*msgpb.ImportFile, options []*commonpb.KeyValuePair, reqFiles []*msgpb.ImportFile, options []*commonpb.KeyValuePair,

View File

@ -946,6 +946,60 @@ func TestImportTask_MarshalJSON(t *testing.T) {
assert.Equal(t, task.GetCompleteTime(), importTask.CompleteTime) assert.Equal(t, task.GetCompleteTime(), importTask.CompleteTime)
} }
func TestLogResultSegmentsInfo(t *testing.T) {
// Create mock catalog and broker
mockCatalog := mocks.NewDataCoordCatalog(t)
meta := &meta{
segments: NewSegmentsInfo(),
catalog: mockCatalog,
}
// Create test segments
segments := []*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "ch1",
NumOfRows: 100,
State: commonpb.SegmentState_Flushed,
},
},
{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "ch1",
NumOfRows: 200,
State: commonpb.SegmentState_Flushed,
},
},
{
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "ch2",
NumOfRows: 300,
State: commonpb.SegmentState_Flushed,
},
},
}
// Add segments to meta
for _, segment := range segments {
meta.segments.SetSegment(segment.ID, segment)
}
jobID := int64(2)
segmentIDs := []int64{1, 2, 3}
// Call the function
LogResultSegmentsInfo(jobID, meta, segmentIDs)
}
// TestImportUtil_ValidateBinlogImportRequest tests the validation of binlog import request // TestImportUtil_ValidateBinlogImportRequest tests the validation of binlog import request
func TestImportUtil_ValidateBinlogImportRequest(t *testing.T) { func TestImportUtil_ValidateBinlogImportRequest(t *testing.T) {
ctx := context.Background() ctx := context.Background()