milvus/internal/datacoord/import_util.go
XuanYang-cn 1165a5300f
fix: [cp25]Use diskSegmentMaxSize for coll with sparse and dense vectors (#43195)
Previous code uses diskSegmentMaxSize if and only if all of the
collection's vector fields are indexed with DiskANN index.

When introducing sparse vectors, since sparse vector cannot be indexed
with DiskANN index, collections with both dense and sparse vectors will
use maxSize instead.

This PR changes the requirments of using diskSegmentMaxSize to all dense
vectors are indexed with DiskANN indexs, ignoring sparse vector fields.

See also: #43193
pr: #43194

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
2025-07-18 11:16:52 +08:00

689 lines
24 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"fmt"
"math"
"path"
"sort"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field {
res := []zap.Field{
zap.Int64("taskID", task.GetTaskID()),
zap.Int64("jobID", task.GetJobID()),
zap.Int64("collectionID", task.GetCollectionID()),
zap.String("type", task.GetType().String()),
zap.Int64("nodeID", task.GetNodeID()),
}
res = append(res, fields...)
return res
}
func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile,
job ImportJob,
alloc allocator.Allocator,
) ([]ImportTask, error) {
idStart, _, err := alloc.AllocN(int64(len(fileGroups)))
if err != nil {
return nil, err
}
tasks := make([]ImportTask, 0, len(fileGroups))
for i, files := range fileGroups {
fileStats := lo.Map(files, func(f *internalpb.ImportFile, _ int) *datapb.ImportFileStats {
return &datapb.ImportFileStats{
ImportFile: f,
}
})
task := &preImportTask{
PreImportTask: &datapb.PreImportTask{
JobID: job.GetJobID(),
TaskID: idStart + int64(i),
CollectionID: job.GetCollectionID(),
State: datapb.ImportTaskStateV2_Pending,
FileStats: fileStats,
CreatedTime: time.Now().Format("2006-01-02T15:04:05Z07:00"),
},
tr: timerecord.NewTimeRecorder("preimport task"),
}
tasks = append(tasks, task)
}
return tasks, nil
}
func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
job ImportJob, alloc allocator.Allocator, meta *meta,
) ([]ImportTask, error) {
idBegin, _, err := alloc.AllocN(int64(len(fileGroups)))
if err != nil {
return nil, err
}
tasks := make([]ImportTask, 0, len(fileGroups))
for i, group := range fileGroups {
task := &importTask{
ImportTaskV2: &datapb.ImportTaskV2{
JobID: job.GetJobID(),
TaskID: idBegin + int64(i),
CollectionID: job.GetCollectionID(),
NodeID: NullNodeID,
State: datapb.ImportTaskStateV2_Pending,
FileStats: group,
CreatedTime: time.Now().Format("2006-01-02T15:04:05Z07:00"),
},
tr: timerecord.NewTimeRecorder("import task"),
}
segments, err := AssignSegments(job, task, alloc, meta)
if err != nil {
return nil, err
}
task.SegmentIDs = segments
if paramtable.Get().DataCoordCfg.EnableStatsTask.GetAsBool() {
statsSegIDBegin, _, err := alloc.AllocN(int64(len(segments)))
if err != nil {
return nil, err
}
task.StatsSegmentIDs = lo.RangeFrom(statsSegIDBegin, len(segments))
log.Info("preallocate stats segment ids", WrapTaskLog(task, zap.Int64s("segmentIDs", task.StatsSegmentIDs))...)
}
tasks = append(tasks, task)
}
return tasks, nil
}
func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, meta *meta) ([]int64, error) {
pkField, err := typeutil.GetPrimaryFieldSchema(job.GetSchema())
if err != nil {
return nil, err
}
// merge hashed sizes
hashedDataSize := make(map[string]map[int64]int64) // vchannel->(partitionID->size)
for _, fileStats := range task.GetFileStats() {
for vchannel, partStats := range fileStats.GetHashedStats() {
if hashedDataSize[vchannel] == nil {
hashedDataSize[vchannel] = make(map[int64]int64)
}
for partitionID, size := range partStats.GetPartitionDataSize() {
hashedDataSize[vchannel][partitionID] += size
}
}
}
segmentLevel := datapb.SegmentLevel_L1
segmentMaxSize := getExpectedSegmentSize(meta, job.GetCollectionID(), job.GetSchema())
if importutilv2.IsL0Import(job.GetOptions()) {
segmentLevel = datapb.SegmentLevel_L0
segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()
}
// alloc new segments
segments := make([]int64, 0)
addSegment := func(vchannel string, partitionID int64, size int64) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for size > 0 {
segmentInfo, err := AllocImportSegment(ctx, alloc, meta,
task.GetJobID(), task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel)
if err != nil {
return err
}
segments = append(segments, segmentInfo.GetID())
size -= segmentMaxSize
}
return nil
}
for vchannel, partitionSizes := range hashedDataSize {
for partitionID, size := range partitionSizes {
if pkField.GetAutoID() && size == 0 {
// When autoID is enabled, the preimport task estimates row distribution by
// evenly dividing the total row count (numRows) across all vchannels:
// `estimatedCount = numRows / vchannelNum`.
//
// However, the actual import task hashes real auto-generated IDs to determine
// the target vchannel. This mismatch can lead to inaccurate row distribution estimation
// in such corner cases:
//
// - Importing 1 row into 2 vchannels:
// • Preimport: 1 / 2 = 0 → both v0 and v1 are estimated to have 0 rows
// • Import: real autoID (e.g., 457975852966809057) hashes to v1
// → actual result: v0 = 0, v1 = 1
//
// To avoid such inconsistencies, we ensure that at least one segment is
// allocated for each vchannel when autoID is enabled.
size = 1
}
err := addSegment(vchannel, partitionID, size)
if err != nil {
return nil, err
}
}
}
return segments, nil
}
func AllocImportSegment(ctx context.Context,
alloc allocator.Allocator,
meta *meta,
jobID int64, taskID int64,
collectionID UniqueID, partitionID UniqueID,
channelName string,
level datapb.SegmentLevel,
) (*SegmentInfo, error) {
log := log.Ctx(ctx)
id, err := alloc.AllocID(ctx)
if err != nil {
log.Error("failed to alloc id for import segment", zap.Error(err))
return nil, err
}
ts, err := alloc.AllocTimestamp(ctx)
if err != nil {
return nil, err
}
position := &msgpb.MsgPosition{
ChannelName: channelName,
MsgID: nil,
Timestamp: ts,
}
segmentInfo := &datapb.SegmentInfo{
ID: id,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channelName,
NumOfRows: 0,
State: commonpb.SegmentState_Importing,
MaxRowNum: 0,
Level: level,
LastExpireTime: math.MaxUint64,
StartPosition: position,
DmlPosition: position,
}
segmentInfo.IsImporting = true
segment := NewSegmentInfo(segmentInfo)
if err = meta.AddSegment(ctx, segment); err != nil {
log.Error("failed to add import segment", zap.Error(err))
return nil, err
}
log.Info("add import segment done",
zap.Int64("jobID", jobID),
zap.Int64("taskID", taskID),
zap.Int64("collectionID", segmentInfo.CollectionID),
zap.Int64("segmentID", segmentInfo.ID),
zap.String("channel", segmentInfo.InsertChannel),
zap.String("level", level.String()))
return segment, nil
}
func AssemblePreImportRequest(task ImportTask, job ImportJob) *datapb.PreImportRequest {
importFiles := lo.Map(task.(*preImportTask).GetFileStats(),
func(fileStats *datapb.ImportFileStats, _ int) *internalpb.ImportFile {
return fileStats.GetImportFile()
})
return &datapb.PreImportRequest{
JobID: task.GetJobID(),
TaskID: task.GetTaskID(),
CollectionID: task.GetCollectionID(),
PartitionIDs: job.GetPartitionIDs(),
Vchannels: job.GetVchannels(),
Schema: job.GetSchema(),
ImportFiles: importFiles,
Options: job.GetOptions(),
}
}
func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc allocator.Allocator) (*datapb.ImportRequest, error) {
requestSegments := make([]*datapb.ImportRequestSegment, 0)
for _, segmentID := range task.(*importTask).GetSegmentIDs() {
segment := meta.GetSegment(context.TODO(), segmentID)
if segment == nil {
return nil, merr.WrapErrSegmentNotFound(segmentID, "assemble import request failed")
}
requestSegments = append(requestSegments, &datapb.ImportRequestSegment{
SegmentID: segment.GetID(),
PartitionID: segment.GetPartitionID(),
Vchannel: segment.GetInsertChannel(),
})
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ts, err := alloc.AllocTimestamp(ctx)
if err != nil {
return nil, err
}
totalRows := lo.SumBy(task.GetFileStats(), func(stat *datapb.ImportFileStats) int64 {
return stat.GetTotalRows()
})
// Pre-allocate IDs for autoIDs and logIDs.
fieldsNum := len(job.GetSchema().GetFields()) + 2 // userFields + tsField + rowIDField
binlogNum := fieldsNum + 2 // binlogs + statslog + BM25Statslog
expansionFactor := paramtable.Get().DataCoordCfg.ImportPreAllocIDExpansionFactor.GetAsInt64()
preAllocIDNum := (totalRows + 1) * int64(binlogNum) * expansionFactor
idBegin, idEnd, err := alloc.AllocN(preAllocIDNum)
if err != nil {
return nil, err
}
log.Info("pre-allocate ids and ts for import task", WrapTaskLog(task,
zap.Int64("totalRows", totalRows),
zap.Int("fieldsNum", fieldsNum),
zap.Int64("idBegin", idBegin),
zap.Int64("idEnd", idEnd),
zap.Uint64("ts", ts))...,
)
importFiles := lo.Map(task.GetFileStats(), func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile {
return fileStat.GetImportFile()
})
return &datapb.ImportRequest{
JobID: task.GetJobID(),
TaskID: task.GetTaskID(),
CollectionID: task.GetCollectionID(),
PartitionIDs: job.GetPartitionIDs(),
Vchannels: job.GetVchannels(),
Schema: job.GetSchema(),
Files: importFiles,
Options: job.GetOptions(),
Ts: ts,
IDRange: &datapb.IDRange{Begin: idBegin, End: idEnd},
RequestSegments: requestSegments,
}, nil
}
func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats, segmentMaxSize int) [][]*datapb.ImportFileStats {
if len(files) == 0 {
return nil
}
threshold := paramtable.Get().DataCoordCfg.MaxSizeInMBPerImportTask.GetAsInt() * 1024 * 1024
maxSizePerFileGroup := segmentMaxSize * len(job.GetPartitionIDs()) * len(job.GetVchannels())
if maxSizePerFileGroup > threshold {
maxSizePerFileGroup = threshold
}
fileGroups := make([][]*datapb.ImportFileStats, 0)
currentGroup := make([]*datapb.ImportFileStats, 0)
currentSum := 0
sort.Slice(files, func(i, j int) bool {
return files[i].GetTotalMemorySize() < files[j].GetTotalMemorySize()
})
for _, file := range files {
size := int(file.GetTotalMemorySize())
if size > maxSizePerFileGroup {
fileGroups = append(fileGroups, []*datapb.ImportFileStats{file})
} else if currentSum+size <= maxSizePerFileGroup {
currentGroup = append(currentGroup, file)
currentSum += size
} else {
fileGroups = append(fileGroups, currentGroup)
currentGroup = []*datapb.ImportFileStats{file}
currentSum = size
}
}
if len(currentGroup) > 0 {
fileGroups = append(fileGroups, currentGroup)
}
return fileGroups
}
func CheckDiskQuota(ctx context.Context, job ImportJob, meta *meta, importMeta ImportMeta) (int64, error) {
if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() {
return 0, nil
}
if importutilv2.SkipDiskQuotaCheck(job.GetOptions()) {
log.Info("skip disk quota check for import", zap.Int64("jobID", job.GetJobID()))
return 0, nil
}
var (
requestedTotal int64
requestedCollections = make(map[int64]int64)
)
for _, j := range importMeta.GetJobBy(ctx) {
requested := j.GetRequestedDiskSize()
requestedTotal += requested
requestedCollections[j.GetCollectionID()] += requested
}
err := merr.WrapErrServiceQuotaExceeded("disk quota exceeded, please allocate more resources")
quotaInfo := meta.GetQuotaInfo()
totalUsage, collectionsUsage := quotaInfo.TotalBinlogSize, quotaInfo.CollectionBinlogSize
tasks := importMeta.GetTaskBy(ctx, WithJob(job.GetJobID()), WithType(PreImportTaskType))
files := make([]*datapb.ImportFileStats, 0)
for _, task := range tasks {
files = append(files, task.GetFileStats()...)
}
requestSize := lo.SumBy(files, func(file *datapb.ImportFileStats) int64 {
return file.GetTotalMemorySize()
})
totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat()
if float64(totalUsage+requestedTotal+requestSize) > totalDiskQuota {
log.Warn("global disk quota exceeded", zap.Int64("jobID", job.GetJobID()),
zap.Bool("enabled", Params.QuotaConfig.DiskProtectionEnabled.GetAsBool()),
zap.Int64("totalUsage", totalUsage),
zap.Int64("requestedTotal", requestedTotal),
zap.Int64("requestSize", requestSize),
zap.Float64("totalDiskQuota", totalDiskQuota))
return 0, err
}
collectionDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
colID := job.GetCollectionID()
if float64(collectionsUsage[colID]+requestedCollections[colID]+requestSize) > collectionDiskQuota {
log.Warn("collection disk quota exceeded", zap.Int64("jobID", job.GetJobID()),
zap.Bool("enabled", Params.QuotaConfig.DiskProtectionEnabled.GetAsBool()),
zap.Int64("collectionsUsage", collectionsUsage[colID]),
zap.Int64("requestedCollection", requestedCollections[colID]),
zap.Int64("requestSize", requestSize),
zap.Float64("collectionDiskQuota", collectionDiskQuota))
return 0, err
}
return requestSize, nil
}
func getPendingProgress(ctx context.Context, jobID int64, importMeta ImportMeta) float32 {
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(PreImportTaskType))
preImportingFiles := lo.SumBy(tasks, func(task ImportTask) int {
return len(task.GetFileStats())
})
totalFiles := len(importMeta.GetJob(ctx, jobID).GetFiles())
if totalFiles == 0 {
return 1
}
return float32(preImportingFiles) / float32(totalFiles)
}
func getPreImportingProgress(ctx context.Context, jobID int64, importMeta ImportMeta) float32 {
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(PreImportTaskType))
completedTasks := lo.Filter(tasks, func(task ImportTask, _ int) bool {
return task.GetState() == datapb.ImportTaskStateV2_Completed
})
if len(tasks) == 0 {
return 1
}
return float32(len(completedTasks)) / float32(len(tasks))
}
func getImportRowsInfo(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) (importedRows, totalRows int64) {
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType))
segmentIDs := make([]int64, 0)
for _, task := range tasks {
totalRows += lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 {
return file.GetTotalRows()
})
segmentIDs = append(segmentIDs, task.(*importTask).GetSegmentIDs()...)
}
importedRows = meta.GetSegmentsTotalNumRows(segmentIDs)
return
}
func getImportingProgress(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) (float32, int64, int64) {
importedRows, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta)
if totalRows == 0 {
return 1, importedRows, totalRows
}
return float32(importedRows) / float32(totalRows), importedRows, totalRows
}
func getStatsProgress(ctx context.Context, jobID int64, importMeta ImportMeta, sjm StatsJobManager) float32 {
if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() {
return 1
}
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType))
originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
})
if len(originSegmentIDs) == 0 {
return 1
}
doneCnt := 0
for _, originSegmentID := range originSegmentIDs {
t := sjm.GetStatsTask(originSegmentID, indexpb.StatsSubJob_Sort)
if t.GetState() == indexpb.JobState_JobStateFinished {
doneCnt++
}
}
return float32(doneCnt) / float32(len(originSegmentIDs))
}
func getIndexBuildingProgress(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) float32 {
job := importMeta.GetJob(ctx, jobID)
if !Params.DataCoordCfg.WaitForIndex.GetAsBool() {
return 1
}
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType))
originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
})
targetSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetStatsSegmentIDs()
})
if len(originSegmentIDs) == 0 {
return 1
}
if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() {
targetSegmentIDs = originSegmentIDs
}
unindexed := meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), targetSegmentIDs)
return float32(len(targetSegmentIDs)-len(unindexed)) / float32(len(targetSegmentIDs))
}
// GetJobProgress calculates the importing job progress.
// The weight of each status is as follows:
// 10%: Pending
// 30%: PreImporting
// 30%: Importing
// 10%: Stats
// 10%: IndexBuilding
// 10%: Completed
// TODO: Wrap a function to map status to user status.
// TODO: Save these progress to job instead of recalculating.
func GetJobProgress(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta, sjm StatsJobManager) (int64, internalpb.ImportJobState, int64, int64, string) {
job := importMeta.GetJob(ctx, jobID)
if job == nil {
return 0, internalpb.ImportJobState_Failed, 0, 0, fmt.Sprintf("import job does not exist, jobID=%d", jobID)
}
switch job.GetState() {
case internalpb.ImportJobState_Pending:
progress := getPendingProgress(ctx, jobID, importMeta)
return int64(progress * 10), internalpb.ImportJobState_Pending, 0, 0, ""
case internalpb.ImportJobState_PreImporting:
progress := getPreImportingProgress(ctx, jobID, importMeta)
return 10 + int64(progress*30), internalpb.ImportJobState_Importing, 0, 0, ""
case internalpb.ImportJobState_Importing:
progress, importedRows, totalRows := getImportingProgress(ctx, jobID, importMeta, meta)
return 10 + 30 + int64(progress*30), internalpb.ImportJobState_Importing, importedRows, totalRows, ""
case internalpb.ImportJobState_Stats:
progress := getStatsProgress(ctx, jobID, importMeta, sjm)
_, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta)
return 10 + 30 + 30 + int64(progress*10), internalpb.ImportJobState_Importing, totalRows, totalRows, ""
case internalpb.ImportJobState_IndexBuilding:
progress := getIndexBuildingProgress(ctx, jobID, importMeta, meta)
_, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta)
return 10 + 30 + 30 + 10 + int64(progress*10), internalpb.ImportJobState_Importing, totalRows, totalRows, ""
case internalpb.ImportJobState_Completed:
_, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta)
return 100, internalpb.ImportJobState_Completed, totalRows, totalRows, ""
case internalpb.ImportJobState_Failed:
return 0, internalpb.ImportJobState_Failed, 0, 0, job.GetReason()
}
return 0, internalpb.ImportJobState_None, 0, 0, "unknown import job state"
}
func GetTaskProgresses(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) []*internalpb.ImportTaskProgress {
progresses := make([]*internalpb.ImportTaskProgress, 0)
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType))
for _, task := range tasks {
totalRows := lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 {
return file.GetTotalRows()
})
importedRows := meta.GetSegmentsTotalNumRows(task.(*importTask).GetSegmentIDs())
progress := int64(100)
if totalRows != 0 {
progress = int64(float32(importedRows) / float32(totalRows) * 100)
}
for _, fileStat := range task.GetFileStats() {
progresses = append(progresses, &internalpb.ImportTaskProgress{
FileName: fmt.Sprintf("%v", fileStat.GetImportFile().GetPaths()),
FileSize: fileStat.GetFileSize(),
Reason: task.GetReason(),
Progress: progress,
CompleteTime: task.(*importTask).GetCompleteTime(),
State: task.GetState().String(),
ImportedRows: progress * fileStat.GetTotalRows() / 100,
TotalRows: fileStat.GetTotalRows(),
})
}
}
return progresses
}
func DropImportTask(task ImportTask, cluster Cluster, tm ImportMeta) error {
if task.GetNodeID() == NullNodeID {
return nil
}
req := &datapb.DropImportRequest{
JobID: task.GetJobID(),
TaskID: task.GetTaskID(),
}
err := cluster.DropImport(task.GetNodeID(), req)
if err != nil && !errors.Is(err, merr.ErrNodeNotFound) {
return err
}
log.Info("drop import in datanode done", WrapTaskLog(task)...)
return tm.UpdateTask(context.TODO(), task.GetTaskID(), UpdateNodeID(NullNodeID))
}
func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager, importFile *internalpb.ImportFile) ([]*internalpb.ImportFile, error) {
if len(importFile.GetPaths()) == 0 {
return nil, merr.WrapErrImportFailed("no insert binlogs to import")
}
if len(importFile.GetPaths()) > 2 {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("too many input paths for binlog import. "+
"Valid paths length should be one or two, but got paths:%s", importFile.GetPaths()))
}
insertPrefix := importFile.GetPaths()[0]
segmentInsertPaths, _, err := storage.ListAllChunkWithPrefix(ctx, cm, insertPrefix, false)
if err != nil {
return nil, err
}
segmentImportFiles := lo.Map(segmentInsertPaths, func(segmentPath string, _ int) *internalpb.ImportFile {
return &internalpb.ImportFile{Paths: []string{segmentPath}}
})
if len(importFile.GetPaths()) < 2 {
return segmentImportFiles, nil
}
deltaPrefix := importFile.GetPaths()[1]
segmentDeltaPaths, _, err := storage.ListAllChunkWithPrefix(ctx, cm, deltaPrefix, false)
if err != nil {
return nil, err
}
if len(segmentDeltaPaths) == 0 {
return segmentImportFiles, nil
}
deltaSegmentIDs := lo.KeyBy(segmentDeltaPaths, func(deltaPrefix string) string {
return path.Base(deltaPrefix)
})
for i := range segmentImportFiles {
segmentID := path.Base(segmentImportFiles[i].GetPaths()[0])
if deltaPrefix, ok := deltaSegmentIDs[segmentID]; ok {
segmentImportFiles[i].Paths = append(segmentImportFiles[i].Paths, deltaPrefix)
}
}
return segmentImportFiles, nil
}
func LogResultSegmentsInfo(jobID int64, meta *meta, segmentIDs []int64) {
type (
segments = []*SegmentInfo
segmentInfo struct {
ID int64
Rows int64
Size int64
}
)
segmentsByChannelAndPartition := make(map[string]map[int64]segments) // channel => [partition => segments]
for _, segmentInfo := range meta.GetSegmentInfos(segmentIDs) {
channel := segmentInfo.GetInsertChannel()
partition := segmentInfo.GetPartitionID()
if _, ok := segmentsByChannelAndPartition[channel]; !ok {
segmentsByChannelAndPartition[channel] = make(map[int64]segments)
}
segmentsByChannelAndPartition[channel][partition] = append(segmentsByChannelAndPartition[channel][partition], segmentInfo)
}
var (
totalRows int64
totalSize int64
)
for channel, partitionSegments := range segmentsByChannelAndPartition {
for partitionID, segments := range partitionSegments {
infos := lo.Map(segments, func(segment *SegmentInfo, _ int) *segmentInfo {
rows := segment.GetNumOfRows()
size := segment.getSegmentSize()
totalRows += rows
totalSize += size
return &segmentInfo{
ID: segment.GetID(),
Rows: rows,
Size: size,
}
})
log.Info("import segments info", zap.Int64("jobID", jobID),
zap.String("channel", channel), zap.Int64("partitionID", partitionID),
zap.Int("segmentsNum", len(segments)), zap.Any("segmentsInfo", infos),
)
}
}
log.Info("import result info", zap.Int64("jobID", jobID),
zap.Int64("totalRows", totalRows), zap.Int64("totalSize", totalSize))
}