// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package compactor import ( "context" "fmt" sio "io" "math" "path" "sort" "strconv" "strings" "sync" "time" "github.com/cockroachdb/errors" "github.com/samber/lo" "go.opentelemetry.io/otel" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/compaction" "github.com/milvus-io/milvus/internal/flushcommon/io" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/clusteringpb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) const ( expectedBinlogSize = 16 * 1024 * 1024 ) var _ Compactor = (*clusteringCompactionTask)(nil) type clusteringCompactionTask struct { binlogIO io.BinlogIO logIDAlloc allocator.Interface segIDAlloc allocator.Interface ctx context.Context cancel context.CancelFunc done chan struct{} tr *timerecord.TimeRecorder mappingPool *conc.Pool[any] flushPool *conc.Pool[any] plan *datapb.CompactionPlan // flush flushCount *atomic.Int64 // metrics, don't use writtenRowNum *atomic.Int64 // inner field collectionID int64 partitionID int64 currentTime time.Time // for TTL isVectorClusteringKey bool clusteringKeyField *schemapb.FieldSchema primaryKeyField *schemapb.FieldSchema memoryLimit int64 bufferSize int64 clusterBuffers []*ClusterBuffer // scalar keyToBufferFunc func(interface{}) *ClusterBuffer // vector segmentIDOffsetMapping map[int64]string offsetToBufferFunc func(int64, []uint32) *ClusterBuffer // bm25 bm25FieldIds []int64 compactionParams compaction.Params } type ClusterBuffer struct { id int writer *MultiSegmentWriter clusteringKeyFieldStats *storage.FieldStats lock sync.RWMutex } func (b *ClusterBuffer) Write(v *storage.Value) error { b.lock.Lock() defer b.lock.Unlock() return b.writer.WriteValue(v) } func (b *ClusterBuffer) Flush() error { b.lock.Lock() defer b.lock.Unlock() return b.writer.Flush() } func (b *ClusterBuffer) FlushChunk() error { b.lock.Lock() defer b.lock.Unlock() return b.writer.FlushChunk() } func (b *ClusterBuffer) Close() error { b.lock.Lock() defer b.lock.Unlock() return b.writer.Close() } func (b *ClusterBuffer) GetCompactionSegments() []*datapb.CompactionSegment { b.lock.RLock() defer b.lock.RUnlock() return b.writer.GetCompactionSegments() } func (b *ClusterBuffer) GetBufferSize() uint64 { b.lock.RLock() defer b.lock.RUnlock() return b.writer.GetBufferUncompressed() } func newClusterBuffer(id int, writer *MultiSegmentWriter, clusteringKeyFieldStats *storage.FieldStats) *ClusterBuffer { return &ClusterBuffer{ id: id, writer: writer, clusteringKeyFieldStats: clusteringKeyFieldStats, lock: sync.RWMutex{}, } } func NewClusteringCompactionTask( ctx context.Context, binlogIO io.BinlogIO, plan *datapb.CompactionPlan, compactionParams compaction.Params, ) *clusteringCompactionTask { ctx, cancel := context.WithCancel(ctx) return &clusteringCompactionTask{ ctx: ctx, cancel: cancel, binlogIO: binlogIO, plan: plan, tr: timerecord.NewTimeRecorder("clustering_compaction"), done: make(chan struct{}, 1), clusterBuffers: make([]*ClusterBuffer, 0), flushCount: atomic.NewInt64(0), writtenRowNum: atomic.NewInt64(0), compactionParams: compactionParams, } } func (t *clusteringCompactionTask) Complete() { t.done <- struct{}{} } func (t *clusteringCompactionTask) Stop() { t.cancel() <-t.done } func (t *clusteringCompactionTask) GetPlanID() typeutil.UniqueID { return t.plan.GetPlanID() } func (t *clusteringCompactionTask) GetChannelName() string { return t.plan.GetChannel() } func (t *clusteringCompactionTask) GetCompactionType() datapb.CompactionType { return t.plan.GetType() } func (t *clusteringCompactionTask) GetCollection() int64 { return t.plan.GetSegmentBinlogs()[0].GetCollectionID() } func (t *clusteringCompactionTask) init() error { if t.plan.GetType() != datapb.CompactionType_ClusteringCompaction { return merr.WrapErrIllegalCompactionPlan("illegal compaction type") } if len(t.plan.GetSegmentBinlogs()) == 0 { return merr.WrapErrIllegalCompactionPlan("empty segment binlogs") } t.collectionID = t.GetCollection() t.partitionID = t.plan.GetSegmentBinlogs()[0].GetPartitionID() logIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedLogIDs().GetBegin(), t.plan.GetPreAllocatedLogIDs().GetEnd()) segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd()) log.Info("segment ID range", zap.Int64("begin", t.plan.GetPreAllocatedSegmentIDs().GetBegin()), zap.Int64("end", t.plan.GetPreAllocatedSegmentIDs().GetEnd())) t.logIDAlloc = logIDAlloc t.segIDAlloc = segIDAlloc var pkField *schemapb.FieldSchema if t.plan.Schema == nil { return merr.WrapErrIllegalCompactionPlan("empty schema in compactionPlan") } for _, field := range t.plan.Schema.Fields { if field.GetIsPrimaryKey() && field.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(field.GetDataType()) { pkField = field } if field.GetFieldID() == t.plan.GetClusteringKeyField() { t.clusteringKeyField = field } } for _, function := range t.plan.Schema.Functions { if function.GetType() == schemapb.FunctionType_BM25 { t.bm25FieldIds = append(t.bm25FieldIds, function.GetOutputFieldIds()[0]) } } t.primaryKeyField = pkField t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType) t.currentTime = time.Now() t.memoryLimit = t.getMemoryLimit() t.bufferSize = int64(t.compactionParams.BinLogMaxSize) // Use binlog max size as read and write buffer size workerPoolSize := t.getWorkerPoolSize() t.mappingPool = conc.NewPool[any](workerPoolSize) t.flushPool = conc.NewPool[any](workerPoolSize) log.Info("clustering compaction task initialed", zap.Int64("memory_buffer_size", t.memoryLimit), zap.Int("worker_pool_size", workerPoolSize)) return nil } func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("clusteringCompaction-%d", t.GetPlanID())) defer span.End() log := log.With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String())) // 0, verify and init err := t.init() if err != nil { log.Error("compaction task init failed", zap.Error(err)) return nil, err } if !funcutil.CheckCtxValid(ctx) { log.Warn("compact wrong, task context done or timeout") return nil, ctx.Err() } defer t.cleanUp(ctx) // 1, decompose binlogs as preparation for later mapping if err := binlog.DecompressCompactionBinlogsWithRootPath(t.compactionParams.StorageConfig.GetRootPath(), t.plan.SegmentBinlogs); err != nil { log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err)) return nil, err } // 2, get analyze result if t.isVectorClusteringKey { if err := t.getVectorAnalyzeResult(ctx); err != nil { log.Error("failed in analyze vector", zap.Error(err)) return nil, err } } else { if err := t.getScalarAnalyzeResult(ctx); err != nil { log.Error("failed in analyze scalar", zap.Error(err)) return nil, err } } // 3, mapping log.Info("Clustering compaction start mapping", zap.Int("bufferNum", len(t.clusterBuffers))) uploadSegments, partitionStats, err := t.mapping(ctx) if err != nil { log.Error("failed in mapping", zap.Error(err)) return nil, err } // 4, collect partition stats err = t.uploadPartitionStats(ctx, t.collectionID, t.partitionID, partitionStats) if err != nil { return nil, err } // 5, assemble CompactionPlanResult planResult := &datapb.CompactionPlanResult{ State: datapb.CompactionTaskState_completed, PlanID: t.GetPlanID(), Segments: uploadSegments, Type: t.plan.GetType(), Channel: t.plan.GetChannel(), } metrics.DataNodeCompactionLatency. WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()). Observe(float64(t.tr.ElapseSpan().Milliseconds())) log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()), zap.Int64("flushTimes", t.flushCount.Load())) // clear the buffer cache t.keyToBufferFunc = nil return planResult, nil } func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) error { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getScalarAnalyzeResult-%d", t.GetPlanID())) defer span.End() analyzeDict, err := t.scalarAnalyze(ctx) if err != nil { return err } buckets, containsNull := t.splitClusterByScalarValue(analyzeDict) scalarToClusterBufferMap := make(map[interface{}]*ClusterBuffer, 0) for id, bucket := range buckets { fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0) if err != nil { return err } for _, key := range bucket { fieldStats.UpdateMinMax(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, key)) } alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.bufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig), storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI)) if err != nil { return err } buffer := newClusterBuffer(id, writer, fieldStats) t.clusterBuffers = append(t.clusterBuffers, buffer) for _, key := range bucket { scalarToClusterBufferMap[key] = buffer } } var nullBuffer *ClusterBuffer if containsNull { fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0) if err != nil { return err } alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.bufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig), storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI)) if err != nil { return err } nullBuffer = newClusterBuffer(len(buckets), writer, fieldStats) t.clusterBuffers = append(t.clusterBuffers, nullBuffer) } t.keyToBufferFunc = func(key interface{}) *ClusterBuffer { if key == nil { return nullBuffer } // todo: if keys are too many, the map will be quite large, we should mark the range of each buffer and select buffer by range return scalarToClusterBufferMap[key] } return nil } func splitCentroids(centroids []int, num int) ([][]int, map[int]int) { if num <= 0 { return nil, nil } result := make([][]int, num) resultIndex := make(map[int]int, len(centroids)) listLen := len(centroids) for i := 0; i < listLen; i++ { group := i % num result[group] = append(result[group], centroids[i]) resultIndex[i] = group } return result, resultIndex } func (t *clusteringCompactionTask) generatedVectorPlan(ctx context.Context, bufferNum int, centroids []*schemapb.VectorField) error { centroidsOffset := make([]int, len(centroids)) for i := 0; i < len(centroids); i++ { centroidsOffset[i] = i } centroidGroups, groupIndex := splitCentroids(centroidsOffset, bufferNum) for id, group := range centroidGroups { fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0) if err != nil { return err } centroidValues := make([]storage.VectorFieldValue, len(group)) for i, offset := range group { centroidValues[i] = storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroids[offset]) } fieldStats.SetVectorCentroids(centroidValues...) alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.bufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig), storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI)) if err != nil { return err } buffer := newClusterBuffer(id, writer, fieldStats) t.clusterBuffers = append(t.clusterBuffers, buffer) } t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer { centroidGroupOffset := groupIndex[int(idMapping[offset])] return t.clusterBuffers[centroidGroupOffset] } return nil } func (t *clusteringCompactionTask) switchPolicyForVectorPlan(ctx context.Context, centroids *clusteringpb.ClusteringCentroidsStats) error { bufferNum := len(centroids.GetCentroids()) bufferNumByMemory := int(t.memoryLimit / expectedBinlogSize) if bufferNumByMemory < bufferNum { bufferNum = bufferNumByMemory } return t.generatedVectorPlan(ctx, bufferNum, centroids.GetCentroids()) } func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) error { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getVectorAnalyzeResult-%d", t.GetPlanID())) defer span.End() log := log.Ctx(ctx) analyzeResultPath := t.plan.AnalyzeResultPath centroidFilePath := path.Join(analyzeResultPath, metautil.JoinIDPath(t.collectionID, t.partitionID, t.clusteringKeyField.FieldID), common.Centroids) offsetMappingFiles := make(map[int64]string, 0) for _, segmentID := range t.plan.AnalyzeSegmentIds { path := path.Join(analyzeResultPath, metautil.JoinIDPath(t.collectionID, t.partitionID, t.clusteringKeyField.FieldID, segmentID), common.OffsetMapping) offsetMappingFiles[segmentID] = path log.Debug("read segment offset mapping file", zap.Int64("segmentID", segmentID), zap.String("path", path)) } t.segmentIDOffsetMapping = offsetMappingFiles centroidBytes, err := t.binlogIO.Download(ctx, []string{centroidFilePath}) if err != nil { return err } centroids := &clusteringpb.ClusteringCentroidsStats{} err = proto.Unmarshal(centroidBytes[0], centroids) if err != nil { return err } log.Debug("read clustering centroids stats", zap.String("path", centroidFilePath), zap.Int("centroidNum", len(centroids.GetCentroids())), zap.Any("offsetMappingFiles", t.segmentIDOffsetMapping)) return t.switchPolicyForVectorPlan(ctx, centroids) } // mapping read and split input segments into buffers func (t *clusteringCompactionTask) mapping(ctx context.Context, ) ([]*datapb.CompactionSegment, *storage.PartitionStatsSnapshot, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mapping-%d", t.GetPlanID())) defer span.End() inputSegments := t.plan.GetSegmentBinlogs() mapStart := time.Now() log := log.Ctx(ctx) futures := make([]*conc.Future[any], 0, len(inputSegments)) for _, segment := range inputSegments { segmentClone := &datapb.CompactionSegmentBinlogs{ SegmentID: segment.SegmentID, // only FieldBinlogs and deltalogs needed Deltalogs: segment.Deltalogs, FieldBinlogs: segment.FieldBinlogs, StorageVersion: segment.StorageVersion, Manifest: segment.GetManifest(), } future := t.mappingPool.Submit(func() (any, error) { err := t.mappingSegment(ctx, segmentClone) return struct{}{}, err }) futures = append(futures, future) } if err := conc.AwaitAll(futures...); err != nil { return nil, nil, err } // force flush all buffers err := t.flushAll() if err != nil { return nil, nil, err } resultSegments := make([]*datapb.CompactionSegment, 0) resultPartitionStats := &storage.PartitionStatsSnapshot{ SegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats), } for _, buffer := range t.clusterBuffers { segments := buffer.GetCompactionSegments() log.Debug("compaction segments", zap.Any("segments", segments)) resultSegments = append(resultSegments, segments...) for _, segment := range segments { segmentStats := storage.SegmentStats{ FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()}, NumRows: int(segment.NumOfRows), } resultPartitionStats.SegmentStats[segment.SegmentID] = segmentStats log.Debug("compaction segment partitioning stats", zap.Int64("segmentID", segment.SegmentID), zap.Any("stats", segmentStats)) } } log.Info("mapping end", zap.Int64("collectionID", t.GetCollection()), zap.Int64("partitionID", t.partitionID), zap.Int("segmentFrom", len(inputSegments)), zap.Int("segmentTo", len(resultSegments)), zap.Duration("elapse", time.Since(mapStart))) return resultSegments, resultPartitionStats, nil } func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 { var totalBufferSize int64 = 0 for _, buffer := range t.clusterBuffers { totalBufferSize = totalBufferSize + int64(buffer.GetBufferSize()) } return totalBufferSize } // read insert log of one segment, mappingSegment into buckets according to clusteringKey. flush data to file when necessary func (t *clusteringCompactionTask) mappingSegment( ctx context.Context, segment *datapb.CompactionSegmentBinlogs, ) error { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mappingSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID())) defer span.End() log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollection()), zap.Int64("partitionID", t.partitionID), zap.Int64("segmentID", segment.GetSegmentID())) log.Info("mapping segment start") processStart := time.Now() var remained int64 = 0 deltaPaths := make([]string, 0) for _, d := range segment.GetDeltalogs() { for _, l := range d.GetBinlogs() { deltaPaths = append(deltaPaths, l.GetLogPath()) } } delta, err := compaction.ComposeDeleteFromDeltalogs(ctx, t.binlogIO, deltaPaths) if err != nil { return err } entityFilter := compaction.NewEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) mappingStats := &clusteringpb.ClusteringCentroidIdMappingStats{} if t.isVectorClusteringKey { offSetPath := t.segmentIDOffsetMapping[segment.SegmentID] offsetBytes, err := t.binlogIO.Download(ctx, []string{offSetPath}) if err != nil { return err } err = proto.Unmarshal(offsetBytes[0], mappingStats) if err != nil { return err } } // Get the number of field binlog files from non-empty segment var binlogNum int for _, b := range segment.GetFieldBinlogs() { if b != nil { binlogNum = len(b.GetBinlogs()) break } } // Unable to deal with all empty segments cases, so return error if binlogNum == 0 { log.Warn("compact wrong, all segments' binlogs are empty") return merr.WrapErrIllegalCompactionPlan() } var rr storage.RecordReader if segment.GetManifest() != "" { rr, err = storage.NewManifestRecordReader(ctx, segment.GetManifest(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { return t.binlogIO.Download(ctx, paths) }), storage.WithCollectionID(t.GetCollection()), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.bufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig), ) } else { rr, err = storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { return t.binlogIO.Download(ctx, paths) }), storage.WithCollectionID(t.GetCollection()), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.bufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig), ) } if err != nil { log.Warn("new binlog record reader wrong", zap.Error(err)) return err } defer rr.Close() offset := int64(-1) for { r, err := rr.Next() if err != nil { if err == sio.EOF { break } log.Warn("compact wrong, failed to iter through data", zap.Error(err)) return err } vs := make([]*storage.Value, r.Len()) if err = storage.ValueDeserializerWithSchema(r, vs, t.plan.Schema, false); err != nil { log.Warn("compact wrong, failed to deserialize data", zap.Error(err)) return err } for _, v := range vs { offset++ if entityFilter.Filtered((*v).PK.GetValue(), uint64((*v).Timestamp)) { continue } row, ok := (*v).Value.(map[typeutil.UniqueID]interface{}) if !ok { log.Warn("convert interface to map wrong") return errors.New("unexpected error") } clusteringKey := row[t.clusteringKeyField.FieldID] var clusterBuffer *ClusterBuffer if t.isVectorClusteringKey { clusterBuffer = t.offsetToBufferFunc(offset, mappingStats.GetCentroidIdMapping()) } else { clusterBuffer = t.keyToBufferFunc(clusteringKey) } if err := clusterBuffer.Write(v); err != nil { return err } t.writtenRowNum.Inc() remained++ if (remained+1)%100 == 0 { currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize() if currentBufferTotalMemorySize > t.getMemoryBufferHighWatermark() { // reach flushBinlog trigger threshold log.Debug("largest buffer need to flush", zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize)) if err := t.flushLargestBuffers(ctx); err != nil { return err } } } } // all cluster buffers are flushed for a certain record, since the values read from the same record are references instead of copies for _, buffer := range t.clusterBuffers { buffer.Flush() } } missing := entityFilter.GetMissingDeleteCount() log.Info("mapping segment end", zap.Int64("remained_entities", remained), zap.Int("deleted_entities", entityFilter.GetDeletedCount()), zap.Int("expired_entities", entityFilter.GetExpiredCount()), zap.Int("deltalog deletes", entityFilter.GetDeltalogDeleteCount()), zap.Int("missing deletes", missing), zap.Int64("written_row_num", t.writtenRowNum.Load()), zap.Duration("elapse", time.Since(processStart))) metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(entityFilter.GetDeltalogDeleteCount())) metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(missing)) return nil } func (t *clusteringCompactionTask) getWorkerPoolSize() int { return int(math.Max(float64(paramtable.Get().DataNodeCfg.ClusteringCompactionWorkerPoolSize.GetAsInt()), 1.0)) } // getMemoryLimit returns the maximum memory that a clustering compaction task is allowed to use func (t *clusteringCompactionTask) getMemoryLimit() int64 { return int64(float64(hardware.GetMemoryCount()) * paramtable.Get().DataNodeCfg.ClusteringCompactionMemoryBufferRatio.GetAsFloat()) } func (t *clusteringCompactionTask) getMemoryBufferLowWatermark() int64 { return int64(float64(t.memoryLimit) * 0.3) } func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 { return int64(float64(t.memoryLimit) * 0.7) } func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) error { currentMemorySize := t.getBufferTotalUsedMemorySize() if currentMemorySize <= t.getMemoryBufferLowWatermark() { log.Info("memory low water mark", zap.Int64("memoryBufferSize", currentMemorySize)) return nil } _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "flushLargestBuffers") defer span.End() bufferIDs := make([]int, 0) bufferSizes := make([]int64, 0) for _, buffer := range t.clusterBuffers { bufferIDs = append(bufferIDs, buffer.id) bufferSizes = append(bufferSizes, int64(buffer.GetBufferSize())) } sort.Slice(bufferIDs, func(i, j int) bool { return bufferSizes[bufferIDs[i]] > bufferSizes[bufferIDs[j]] }) log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs), zap.Int64("currentMemorySize", currentMemorySize)) futures := make([]*conc.Future[any], 0) for _, bufferId := range bufferIDs { buffer := t.clusterBuffers[bufferId] size := buffer.GetBufferSize() currentMemorySize -= int64(size) log.Info("currentMemorySize after flush buffer binlog", zap.Int64("currentMemorySize", currentMemorySize), zap.Int("bufferID", bufferId), zap.Uint64("WrittenUncompressed", size)) future := t.flushPool.Submit(func() (any, error) { err := buffer.FlushChunk() if err != nil { return nil, err } return struct{}{}, nil }) futures = append(futures, future) if currentMemorySize <= t.getMemoryBufferLowWatermark() { log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getBufferTotalUsedMemorySize())) break } } if err := conc.AwaitAll(futures...); err != nil { return err } log.Info("flushLargestBuffers end", zap.Int64("currentMemorySize", currentMemorySize)) return nil } func (t *clusteringCompactionTask) flushAll() error { futures := make([]*conc.Future[any], 0) for _, buffer := range t.clusterBuffers { b := buffer // avoid closure mis-capture future := t.flushPool.Submit(func() (any, error) { err := b.Close() if err != nil { return nil, err } return struct{}{}, nil }) futures = append(futures, future) } if err := conc.AwaitAll(futures...); err != nil { return err } return nil } func (t *clusteringCompactionTask) uploadPartitionStats(ctx context.Context, collectionID, partitionID typeutil.UniqueID, partitionStats *storage.PartitionStatsSnapshot) error { // use planID as partitionStats version version := t.plan.PlanID partitionStats.Version = version partitionStatsBytes, err := storage.SerializePartitionStatsSnapshot(partitionStats) if err != nil { return err } rootPath := strings.Split(t.plan.AnalyzeResultPath, common.AnalyzeStatsPath)[0] newStatsPath := path.Join(rootPath, common.PartitionStatsPath, metautil.JoinIDPath(collectionID, partitionID), t.plan.GetChannel(), strconv.FormatInt(version, 10)) kv := map[string][]byte{ newStatsPath: partitionStatsBytes, } err = t.binlogIO.Upload(ctx, kv) if err != nil { return err } log.Info("Finish upload PartitionStats file", zap.String("key", newStatsPath), zap.Int("length", len(partitionStatsBytes))) return nil } // cleanUp try best to clean all temp datas func (t *clusteringCompactionTask) cleanUp(ctx context.Context) { if t.mappingPool != nil { t.mappingPool.Release() } if t.flushPool != nil { t.flushPool.Release() } } func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[interface{}]int64, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyze-%d", t.GetPlanID())) defer span.End() inputSegments := t.plan.GetSegmentBinlogs() futures := make([]*conc.Future[any], 0, len(inputSegments)) analyzeStart := time.Now() var mutex sync.Mutex analyzeDict := make(map[interface{}]int64, 0) for _, segment := range inputSegments { segmentClone := proto.Clone(segment).(*datapb.CompactionSegmentBinlogs) future := t.mappingPool.Submit(func() (any, error) { analyzeResult, err := t.scalarAnalyzeSegment(ctx, segmentClone) mutex.Lock() defer mutex.Unlock() for key, v := range analyzeResult { if _, exist := analyzeDict[key]; exist { analyzeDict[key] = analyzeDict[key] + v } else { analyzeDict[key] = v } } return struct{}{}, err }) futures = append(futures, future) } if err := conc.AwaitAll(futures...); err != nil { return nil, err } log.Info("analyze end", zap.Int64("collectionID", t.GetCollection()), zap.Int64("partitionID", t.partitionID), zap.Int("segments", len(inputSegments)), zap.Int("clustering num", len(analyzeDict)), zap.Duration("elapse", time.Since(analyzeStart))) return analyzeDict, nil } func (t *clusteringCompactionTask) scalarAnalyzeSegment( ctx context.Context, segment *datapb.CompactionSegmentBinlogs, ) (map[interface{}]int64, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyzeSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID())) defer span.End() log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("segmentID", segment.GetSegmentID())) processStart := time.Now() // Get the number of field binlog files from non-empty segment var binlogNum int for _, b := range segment.GetFieldBinlogs() { if b != nil { binlogNum = len(b.GetBinlogs()) break } } // Unable to deal with all empty segments cases, so return error if binlogNum == 0 { log.Warn("compact wrong, all segments' binlogs are empty") return nil, merr.WrapErrIllegalCompactionPlan("all segments' binlogs are empty") } log.Debug("binlogNum", zap.Int("binlogNum", binlogNum)) expiredFilter := compaction.NewEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime) binlogs := make([]*datapb.FieldBinlog, 0) requiredFields := typeutil.NewSet[int64]() requiredFields.Insert(0, 1, t.primaryKeyField.GetFieldID(), t.clusteringKeyField.GetFieldID()) selectedFields := lo.Filter(t.plan.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) bool { return requiredFields.Contain(field.GetFieldID()) }) switch segment.GetStorageVersion() { case storage.StorageV1: for _, fieldBinlog := range segment.GetFieldBinlogs() { if requiredFields.Contain(fieldBinlog.GetFieldID()) { binlogs = append(binlogs, fieldBinlog) } } case storage.StorageV2: binlogs = segment.GetFieldBinlogs() default: log.Warn("unsupported storage version", zap.Int64("storage version", segment.GetStorageVersion())) return nil, fmt.Errorf("unsupported storage version %d", segment.GetStorageVersion()) } var rr storage.RecordReader var err error if segment.GetManifest() != "" { rr, err = storage.NewManifestRecordReader(ctx, segment.GetManifest(), t.plan.GetSchema(), storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { return t.binlogIO.Download(ctx, paths) }), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.bufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig), storage.WithNeededFields(requiredFields), storage.WithCollectionID(t.GetCollection()), ) } else { rr, err = storage.NewBinlogRecordReader(ctx, binlogs, t.plan.GetSchema(), storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { return t.binlogIO.Download(ctx, paths) }), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.bufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig), storage.WithNeededFields(requiredFields), storage.WithCollectionID(t.GetCollection()), ) } if err != nil { log.Warn("new binlog record reader wrong", zap.Error(err)) return make(map[interface{}]int64), err } pkIter := storage.NewDeserializeReader(rr, func(r storage.Record, v []*storage.Value) error { return storage.ValueDeserializerWithSelectedFields(r, v, selectedFields, true) }) defer pkIter.Close() analyzeResult, remained, err := t.iterAndGetScalarAnalyzeResult(pkIter, expiredFilter) if err != nil { return nil, err } log.Info("analyze segment end", zap.Int64("remained entities", remained), zap.Int("expired entities", expiredFilter.GetExpiredCount()), zap.Duration("map elapse", time.Since(processStart))) return analyzeResult, nil } func (t *clusteringCompactionTask) iterAndGetScalarAnalyzeResult(pkIter *storage.DeserializeReaderImpl[*storage.Value], expiredFilter compaction.EntityFilter) (map[interface{}]int64, int64, error) { // initial timestampFrom, timestampTo = -1, -1 is an illegal value, only to mark initial state var ( remained int64 = 0 analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0) ) for { v, err := pkIter.NextValue() if err != nil { if err == sio.EOF { pkIter.Close() break } else { log.Warn("compact wrong, failed to iter through data", zap.Error(err)) return nil, 0, err } } // Filtering expired entity if expiredFilter.Filtered((*v).PK.GetValue(), uint64((*v).Timestamp)) { continue } // rowValue := vIter.GetData().(*iterators.InsertRow).GetValue() row, ok := (*v).Value.(map[typeutil.UniqueID]interface{}) if !ok { return nil, 0, errors.New("unexpected error") } key := row[t.clusteringKeyField.GetFieldID()] if _, exist := analyzeResult[key]; exist { analyzeResult[key] = analyzeResult[key] + 1 } else { analyzeResult[key] = 1 } remained++ } return analyzeResult, remained, nil } func (t *clusteringCompactionTask) generatedScalarPlan(maxRows, preferRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} { buckets := make([][]interface{}, 0) currentBucket := make([]interface{}, 0) var currentBucketSize int64 = 0 for _, key := range keys { // todo can optimize if dict[key] > preferRows { if len(currentBucket) != 0 { buckets = append(buckets, currentBucket) currentBucket = make([]interface{}, 0) currentBucketSize = 0 } buckets = append(buckets, []interface{}{key}) } else if currentBucketSize+dict[key] > maxRows { buckets = append(buckets, currentBucket) currentBucket = []interface{}{key} currentBucketSize = dict[key] } else if currentBucketSize+dict[key] > preferRows { currentBucket = append(currentBucket, key) buckets = append(buckets, currentBucket) currentBucket = make([]interface{}, 0) currentBucketSize = 0 } else { currentBucket = append(currentBucket, key) currentBucketSize += dict[key] } } buckets = append(buckets, currentBucket) return buckets } func (t *clusteringCompactionTask) switchPolicyForScalarPlan(totalRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} { bufferNumBySegmentMaxRows := totalRows / t.plan.MaxSegmentRows bufferNumByMemory := t.memoryLimit / expectedBinlogSize log.Info("switchPolicyForScalarPlan", zap.Int64("totalRows", totalRows), zap.Int64("bufferNumBySegmentMaxRows", bufferNumBySegmentMaxRows), zap.Int64("bufferNumByMemory", bufferNumByMemory)) if bufferNumByMemory > bufferNumBySegmentMaxRows { return t.generatedScalarPlan(t.plan.GetMaxSegmentRows(), t.plan.GetPreferSegmentRows(), keys, dict) } maxRows := totalRows / bufferNumByMemory return t.generatedScalarPlan(maxRows, int64(float64(maxRows)*t.compactionParams.PreferSegmentSizeRatio), keys, dict) } func (t *clusteringCompactionTask) splitClusterByScalarValue(dict map[interface{}]int64) ([][]interface{}, bool) { totalRows := int64(0) keys := lo.MapToSlice(dict, func(k interface{}, v int64) interface{} { totalRows += v return k }) notNullKeys := lo.Filter(keys, func(i interface{}, j int) bool { return i != nil }) sort.Slice(notNullKeys, func(i, j int) bool { return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, notNullKeys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, notNullKeys[j])) }) return t.switchPolicyForScalarPlan(totalRows, notNullKeys, dict), len(keys) > len(notNullKeys) } func (t *clusteringCompactionTask) GetSlotUsage() int64 { return t.plan.GetSlotUsage() }