milvus/internal/datanode/compactor/clustering_compactor.go
Ted Xu 078ccf5e08
fix: the underlying record got released in clustering compaction (#43551)
See: #43186

In this PR:

1. Flush renamed to FlushChunk, while a new Flush primitive is
introduced to serialize values to records.
2. Segment mapping in clustering compaction now process data by records
instead of values, it calls flush to all buffers after each record is
processed.

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
2025-07-25 15:04:54 +08:00

1039 lines
34 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compactor
import (
"context"
"fmt"
sio "io"
"math"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/clusteringpb"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
const (
expectedBinlogSize = 16 * 1024 * 1024
)
var _ Compactor = (*clusteringCompactionTask)(nil)
type clusteringCompactionTask struct {
binlogIO io.BinlogIO
logIDAlloc allocator.Interface
segIDAlloc allocator.Interface
ctx context.Context
cancel context.CancelFunc
done chan struct{}
tr *timerecord.TimeRecorder
mappingPool *conc.Pool[any]
flushPool *conc.Pool[any]
plan *datapb.CompactionPlan
// flush
flushCount *atomic.Int64
// metrics, don't use
writtenRowNum *atomic.Int64
// inner field
collectionID int64
partitionID int64
currentTime time.Time // for TTL
isVectorClusteringKey bool
clusteringKeyField *schemapb.FieldSchema
primaryKeyField *schemapb.FieldSchema
memoryLimit int64
bufferSize int64
clusterBuffers []*ClusterBuffer
// scalar
keyToBufferFunc func(interface{}) *ClusterBuffer
// vector
segmentIDOffsetMapping map[int64]string
offsetToBufferFunc func(int64, []uint32) *ClusterBuffer
// bm25
bm25FieldIds []int64
compactionParams compaction.Params
}
type ClusterBuffer struct {
id int
writer *MultiSegmentWriter
clusteringKeyFieldStats *storage.FieldStats
lock sync.RWMutex
}
func (b *ClusterBuffer) Write(v *storage.Value) error {
b.lock.Lock()
defer b.lock.Unlock()
return b.writer.WriteValue(v)
}
func (b *ClusterBuffer) Flush() error {
b.lock.Lock()
defer b.lock.Unlock()
return b.writer.Flush()
}
func (b *ClusterBuffer) FlushChunk() error {
b.lock.Lock()
defer b.lock.Unlock()
return b.writer.FlushChunk()
}
func (b *ClusterBuffer) Close() error {
b.lock.Lock()
defer b.lock.Unlock()
return b.writer.Close()
}
func (b *ClusterBuffer) GetCompactionSegments() []*datapb.CompactionSegment {
b.lock.RLock()
defer b.lock.RUnlock()
return b.writer.GetCompactionSegments()
}
func (b *ClusterBuffer) GetBufferSize() uint64 {
b.lock.RLock()
defer b.lock.RUnlock()
return b.writer.GetBufferUncompressed()
}
func newClusterBuffer(id int, writer *MultiSegmentWriter, clusteringKeyFieldStats *storage.FieldStats) *ClusterBuffer {
return &ClusterBuffer{
id: id,
writer: writer,
clusteringKeyFieldStats: clusteringKeyFieldStats,
lock: sync.RWMutex{},
}
}
func NewClusteringCompactionTask(
ctx context.Context,
binlogIO io.BinlogIO,
plan *datapb.CompactionPlan,
compactionParams compaction.Params,
) *clusteringCompactionTask {
ctx, cancel := context.WithCancel(ctx)
return &clusteringCompactionTask{
ctx: ctx,
cancel: cancel,
binlogIO: binlogIO,
plan: plan,
tr: timerecord.NewTimeRecorder("clustering_compaction"),
done: make(chan struct{}, 1),
clusterBuffers: make([]*ClusterBuffer, 0),
flushCount: atomic.NewInt64(0),
writtenRowNum: atomic.NewInt64(0),
compactionParams: compactionParams,
}
}
func (t *clusteringCompactionTask) Complete() {
t.done <- struct{}{}
}
func (t *clusteringCompactionTask) Stop() {
t.cancel()
<-t.done
}
func (t *clusteringCompactionTask) GetPlanID() typeutil.UniqueID {
return t.plan.GetPlanID()
}
func (t *clusteringCompactionTask) GetChannelName() string {
return t.plan.GetChannel()
}
func (t *clusteringCompactionTask) GetCompactionType() datapb.CompactionType {
return t.plan.GetType()
}
func (t *clusteringCompactionTask) GetCollection() int64 {
return t.plan.GetSegmentBinlogs()[0].GetCollectionID()
}
func (t *clusteringCompactionTask) init() error {
if t.plan.GetType() != datapb.CompactionType_ClusteringCompaction {
return merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
if len(t.plan.GetSegmentBinlogs()) == 0 {
return merr.WrapErrIllegalCompactionPlan("empty segment binlogs")
}
t.collectionID = t.GetCollection()
t.partitionID = t.plan.GetSegmentBinlogs()[0].GetPartitionID()
logIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedLogIDs().GetBegin(), t.plan.GetPreAllocatedLogIDs().GetEnd())
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd())
log.Info("segment ID range", zap.Int64("begin", t.plan.GetPreAllocatedSegmentIDs().GetBegin()), zap.Int64("end", t.plan.GetPreAllocatedSegmentIDs().GetEnd()))
t.logIDAlloc = logIDAlloc
t.segIDAlloc = segIDAlloc
var pkField *schemapb.FieldSchema
if t.plan.Schema == nil {
return merr.WrapErrIllegalCompactionPlan("empty schema in compactionPlan")
}
for _, field := range t.plan.Schema.Fields {
if field.GetIsPrimaryKey() && field.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(field.GetDataType()) {
pkField = field
}
if field.GetFieldID() == t.plan.GetClusteringKeyField() {
t.clusteringKeyField = field
}
}
for _, function := range t.plan.Schema.Functions {
if function.GetType() == schemapb.FunctionType_BM25 {
t.bm25FieldIds = append(t.bm25FieldIds, function.GetOutputFieldIds()[0])
}
}
t.primaryKeyField = pkField
t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType)
t.currentTime = time.Now()
t.memoryLimit = t.getMemoryLimit()
t.bufferSize = int64(t.compactionParams.BinLogMaxSize) // Use binlog max size as read and write buffer size
workerPoolSize := t.getWorkerPoolSize()
t.mappingPool = conc.NewPool[any](workerPoolSize)
t.flushPool = conc.NewPool[any](workerPoolSize)
log.Info("clustering compaction task initialed", zap.Int64("memory_buffer_size", t.memoryLimit), zap.Int("worker_pool_size", workerPoolSize))
return nil
}
func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("clusteringCompaction-%d", t.GetPlanID()))
defer span.End()
log := log.With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String()))
// 0, verify and init
err := t.init()
if err != nil {
log.Error("compaction task init failed", zap.Error(err))
return nil, err
}
if !funcutil.CheckCtxValid(ctx) {
log.Warn("compact wrong, task context done or timeout")
return nil, ctx.Err()
}
defer t.cleanUp(ctx)
// 1, decompose binlogs as preparation for later mapping
if err := binlog.DecompressCompactionBinlogsWithRootPath(t.compactionParams.StorageConfig.GetRootPath(), t.plan.SegmentBinlogs); err != nil {
log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err))
return nil, err
}
// 2, get analyze result
if t.isVectorClusteringKey {
if err := t.getVectorAnalyzeResult(ctx); err != nil {
log.Error("failed in analyze vector", zap.Error(err))
return nil, err
}
} else {
if err := t.getScalarAnalyzeResult(ctx); err != nil {
log.Error("failed in analyze scalar", zap.Error(err))
return nil, err
}
}
// 3, mapping
log.Info("Clustering compaction start mapping", zap.Int("bufferNum", len(t.clusterBuffers)))
uploadSegments, partitionStats, err := t.mapping(ctx)
if err != nil {
log.Error("failed in mapping", zap.Error(err))
return nil, err
}
// 4, collect partition stats
err = t.uploadPartitionStats(ctx, t.collectionID, t.partitionID, partitionStats)
if err != nil {
return nil, err
}
// 5, assemble CompactionPlanResult
planResult := &datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_completed,
PlanID: t.GetPlanID(),
Segments: uploadSegments,
Type: t.plan.GetType(),
Channel: t.plan.GetChannel(),
}
metrics.DataNodeCompactionLatency.
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()).
Observe(float64(t.tr.ElapseSpan().Milliseconds()))
log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()), zap.Int64("flushTimes", t.flushCount.Load()))
// clear the buffer cache
t.keyToBufferFunc = nil
return planResult, nil
}
func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) error {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getScalarAnalyzeResult-%d", t.GetPlanID()))
defer span.End()
analyzeDict, err := t.scalarAnalyze(ctx)
if err != nil {
return err
}
buckets, containsNull := t.splitClusterByScalarValue(analyzeDict)
scalarToClusterBufferMap := make(map[interface{}]*ClusterBuffer, 0)
for id, bucket := range buckets {
fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0)
if err != nil {
return err
}
for _, key := range bucket {
fieldStats.UpdateMinMax(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, key))
}
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc,
t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows,
t.partitionID, t.collectionID, t.plan.Channel, 100,
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig))
if err != nil {
return err
}
buffer := newClusterBuffer(id, writer, fieldStats)
t.clusterBuffers = append(t.clusterBuffers, buffer)
for _, key := range bucket {
scalarToClusterBufferMap[key] = buffer
}
}
var nullBuffer *ClusterBuffer
if containsNull {
fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0)
if err != nil {
return err
}
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc,
t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows,
t.partitionID, t.collectionID, t.plan.Channel, 100,
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig))
if err != nil {
return err
}
nullBuffer = newClusterBuffer(len(buckets), writer, fieldStats)
t.clusterBuffers = append(t.clusterBuffers, nullBuffer)
}
t.keyToBufferFunc = func(key interface{}) *ClusterBuffer {
if key == nil {
return nullBuffer
}
// todo: if keys are too many, the map will be quite large, we should mark the range of each buffer and select buffer by range
return scalarToClusterBufferMap[key]
}
return nil
}
func splitCentroids(centroids []int, num int) ([][]int, map[int]int) {
if num <= 0 {
return nil, nil
}
result := make([][]int, num)
resultIndex := make(map[int]int, len(centroids))
listLen := len(centroids)
for i := 0; i < listLen; i++ {
group := i % num
result[group] = append(result[group], centroids[i])
resultIndex[i] = group
}
return result, resultIndex
}
func (t *clusteringCompactionTask) generatedVectorPlan(ctx context.Context, bufferNum int, centroids []*schemapb.VectorField) error {
centroidsOffset := make([]int, len(centroids))
for i := 0; i < len(centroids); i++ {
centroidsOffset[i] = i
}
centroidGroups, groupIndex := splitCentroids(centroidsOffset, bufferNum)
for id, group := range centroidGroups {
fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0)
if err != nil {
return err
}
centroidValues := make([]storage.VectorFieldValue, len(group))
for i, offset := range group {
centroidValues[i] = storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroids[offset])
}
fieldStats.SetVectorCentroids(centroidValues...)
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc,
t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows,
t.partitionID, t.collectionID, t.plan.Channel, 100,
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig))
if err != nil {
return err
}
buffer := newClusterBuffer(id, writer, fieldStats)
t.clusterBuffers = append(t.clusterBuffers, buffer)
}
t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer {
centroidGroupOffset := groupIndex[int(idMapping[offset])]
return t.clusterBuffers[centroidGroupOffset]
}
return nil
}
func (t *clusteringCompactionTask) switchPolicyForVectorPlan(ctx context.Context, centroids *clusteringpb.ClusteringCentroidsStats) error {
bufferNum := len(centroids.GetCentroids())
bufferNumByMemory := int(t.memoryLimit / expectedBinlogSize)
if bufferNumByMemory < bufferNum {
bufferNum = bufferNumByMemory
}
return t.generatedVectorPlan(ctx, bufferNum, centroids.GetCentroids())
}
func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) error {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getVectorAnalyzeResult-%d", t.GetPlanID()))
defer span.End()
log := log.Ctx(ctx)
analyzeResultPath := t.plan.AnalyzeResultPath
centroidFilePath := path.Join(analyzeResultPath, metautil.JoinIDPath(t.collectionID, t.partitionID, t.clusteringKeyField.FieldID), common.Centroids)
offsetMappingFiles := make(map[int64]string, 0)
for _, segmentID := range t.plan.AnalyzeSegmentIds {
path := path.Join(analyzeResultPath, metautil.JoinIDPath(t.collectionID, t.partitionID, t.clusteringKeyField.FieldID, segmentID), common.OffsetMapping)
offsetMappingFiles[segmentID] = path
log.Debug("read segment offset mapping file", zap.Int64("segmentID", segmentID), zap.String("path", path))
}
t.segmentIDOffsetMapping = offsetMappingFiles
centroidBytes, err := t.binlogIO.Download(ctx, []string{centroidFilePath})
if err != nil {
return err
}
centroids := &clusteringpb.ClusteringCentroidsStats{}
err = proto.Unmarshal(centroidBytes[0], centroids)
if err != nil {
return err
}
log.Debug("read clustering centroids stats", zap.String("path", centroidFilePath),
zap.Int("centroidNum", len(centroids.GetCentroids())),
zap.Any("offsetMappingFiles", t.segmentIDOffsetMapping))
return t.switchPolicyForVectorPlan(ctx, centroids)
}
// mapping read and split input segments into buffers
func (t *clusteringCompactionTask) mapping(ctx context.Context,
) ([]*datapb.CompactionSegment, *storage.PartitionStatsSnapshot, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mapping-%d", t.GetPlanID()))
defer span.End()
inputSegments := t.plan.GetSegmentBinlogs()
mapStart := time.Now()
log := log.Ctx(ctx)
futures := make([]*conc.Future[any], 0, len(inputSegments))
for _, segment := range inputSegments {
segmentClone := &datapb.CompactionSegmentBinlogs{
SegmentID: segment.SegmentID,
// only FieldBinlogs and deltalogs needed
Deltalogs: segment.Deltalogs,
FieldBinlogs: segment.FieldBinlogs,
StorageVersion: segment.StorageVersion,
}
future := t.mappingPool.Submit(func() (any, error) {
err := t.mappingSegment(ctx, segmentClone)
return struct{}{}, err
})
futures = append(futures, future)
}
if err := conc.AwaitAll(futures...); err != nil {
return nil, nil, err
}
// force flush all buffers
err := t.flushAll()
if err != nil {
return nil, nil, err
}
resultSegments := make([]*datapb.CompactionSegment, 0)
resultPartitionStats := &storage.PartitionStatsSnapshot{
SegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats),
}
for _, buffer := range t.clusterBuffers {
segments := buffer.GetCompactionSegments()
log.Debug("compaction segments", zap.Any("segments", segments))
resultSegments = append(resultSegments, segments...)
for _, segment := range segments {
segmentStats := storage.SegmentStats{
FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()},
NumRows: int(segment.NumOfRows),
}
resultPartitionStats.SegmentStats[segment.SegmentID] = segmentStats
log.Debug("compaction segment partitioning stats", zap.Int64("segmentID", segment.SegmentID), zap.Any("stats", segmentStats))
}
}
log.Info("mapping end",
zap.Int64("collectionID", t.GetCollection()),
zap.Int64("partitionID", t.partitionID),
zap.Int("segmentFrom", len(inputSegments)),
zap.Int("segmentTo", len(resultSegments)),
zap.Duration("elapse", time.Since(mapStart)))
return resultSegments, resultPartitionStats, nil
}
func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 {
var totalBufferSize int64 = 0
for _, buffer := range t.clusterBuffers {
totalBufferSize = totalBufferSize + int64(buffer.GetBufferSize())
}
return totalBufferSize
}
// read insert log of one segment, mappingSegment into buckets according to clusteringKey. flush data to file when necessary
func (t *clusteringCompactionTask) mappingSegment(
ctx context.Context,
segment *datapb.CompactionSegmentBinlogs,
) error {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mappingSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID()))
defer span.End()
log := log.With(zap.Int64("planID", t.GetPlanID()),
zap.Int64("collectionID", t.GetCollection()),
zap.Int64("partitionID", t.partitionID),
zap.Int64("segmentID", segment.GetSegmentID()))
log.Info("mapping segment start")
processStart := time.Now()
var remained int64 = 0
deltaPaths := make([]string, 0)
for _, d := range segment.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
deltaPaths = append(deltaPaths, l.GetLogPath())
}
}
delta, err := compaction.ComposeDeleteFromDeltalogs(ctx, t.binlogIO, deltaPaths)
if err != nil {
return err
}
entityFilter := compaction.NewEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)
mappingStats := &clusteringpb.ClusteringCentroidIdMappingStats{}
if t.isVectorClusteringKey {
offSetPath := t.segmentIDOffsetMapping[segment.SegmentID]
offsetBytes, err := t.binlogIO.Download(ctx, []string{offSetPath})
if err != nil {
return err
}
err = proto.Unmarshal(offsetBytes[0], mappingStats)
if err != nil {
return err
}
}
// Get the number of field binlog files from non-empty segment
var binlogNum int
for _, b := range segment.GetFieldBinlogs() {
if b != nil {
binlogNum = len(b.GetBinlogs())
break
}
}
// Unable to deal with all empty segments cases, so return error
if binlogNum == 0 {
log.Warn("compact wrong, all segments' binlogs are empty")
return merr.WrapErrIllegalCompactionPlan()
}
rr, err := storage.NewBinlogRecordReader(ctx,
segment.GetFieldBinlogs(),
t.plan.Schema,
storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
return t.binlogIO.Download(ctx, paths)
}),
storage.WithVersion(segment.StorageVersion),
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig),
)
if err != nil {
log.Warn("new binlog record reader wrong", zap.Error(err))
return err
}
defer rr.Close()
offset := int64(-1)
for {
r, err := rr.Next()
if err != nil {
if err == sio.EOF {
break
}
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
return err
}
vs := make([]*storage.Value, r.Len())
if err = storage.ValueDeserializer(r, vs, t.plan.Schema.Fields); err != nil {
log.Warn("compact wrong, failed to deserialize data", zap.Error(err))
return err
}
for _, v := range vs {
offset++
if entityFilter.Filtered((*v).PK.GetValue(), uint64((*v).Timestamp)) {
continue
}
row, ok := (*v).Value.(map[typeutil.UniqueID]interface{})
if !ok {
log.Warn("convert interface to map wrong")
return errors.New("unexpected error")
}
clusteringKey := row[t.clusteringKeyField.FieldID]
var clusterBuffer *ClusterBuffer
if t.isVectorClusteringKey {
clusterBuffer = t.offsetToBufferFunc(offset, mappingStats.GetCentroidIdMapping())
} else {
clusterBuffer = t.keyToBufferFunc(clusteringKey)
}
if err := clusterBuffer.Write(v); err != nil {
return err
}
t.writtenRowNum.Inc()
remained++
if (remained+1)%100 == 0 {
currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize()
if currentBufferTotalMemorySize > t.getMemoryBufferHighWatermark() {
// reach flushBinlog trigger threshold
log.Debug("largest buffer need to flush",
zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize))
if err := t.flushLargestBuffers(ctx); err != nil {
return err
}
}
}
}
// all cluster buffers are flushed for a certain record, since the values read from the same record are references instead of copies
for _, buffer := range t.clusterBuffers {
buffer.Flush()
}
}
missing := entityFilter.GetMissingDeleteCount()
log.Info("mapping segment end",
zap.Int64("remained_entities", remained),
zap.Int("deleted_entities", entityFilter.GetDeletedCount()),
zap.Int("expired_entities", entityFilter.GetExpiredCount()),
zap.Int("deltalog deletes", entityFilter.GetDeltalogDeleteCount()),
zap.Int("missing deletes", missing),
zap.Int64("written_row_num", t.writtenRowNum.Load()),
zap.Duration("elapse", time.Since(processStart)))
metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(entityFilter.GetDeltalogDeleteCount()))
metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(missing))
return nil
}
func (t *clusteringCompactionTask) getWorkerPoolSize() int {
return int(math.Max(float64(paramtable.Get().DataNodeCfg.ClusteringCompactionWorkerPoolSize.GetAsInt()), 1.0))
}
// getMemoryLimit returns the maximum memory that a clustering compaction task is allowed to use
func (t *clusteringCompactionTask) getMemoryLimit() int64 {
return int64(float64(hardware.GetMemoryCount()) * paramtable.Get().DataNodeCfg.ClusteringCompactionMemoryBufferRatio.GetAsFloat())
}
func (t *clusteringCompactionTask) getMemoryBufferLowWatermark() int64 {
return int64(float64(t.memoryLimit) * 0.3)
}
func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 {
return int64(float64(t.memoryLimit) * 0.7)
}
func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) error {
currentMemorySize := t.getBufferTotalUsedMemorySize()
if currentMemorySize <= t.getMemoryBufferLowWatermark() {
log.Info("memory low water mark", zap.Int64("memoryBufferSize", currentMemorySize))
return nil
}
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "flushLargestBuffers")
defer span.End()
bufferIDs := make([]int, 0)
bufferSizes := make([]int64, 0)
for _, buffer := range t.clusterBuffers {
bufferIDs = append(bufferIDs, buffer.id)
bufferSizes = append(bufferSizes, int64(buffer.GetBufferSize()))
}
sort.Slice(bufferIDs, func(i, j int) bool {
return bufferSizes[bufferIDs[i]] > bufferSizes[bufferIDs[j]]
})
log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs), zap.Int64("currentMemorySize", currentMemorySize))
futures := make([]*conc.Future[any], 0)
for _, bufferId := range bufferIDs {
buffer := t.clusterBuffers[bufferId]
size := buffer.GetBufferSize()
currentMemorySize -= int64(size)
log.Info("currentMemorySize after flush buffer binlog",
zap.Int64("currentMemorySize", currentMemorySize),
zap.Int("bufferID", bufferId),
zap.Uint64("WrittenUncompressed", size))
future := t.flushPool.Submit(func() (any, error) {
err := buffer.FlushChunk()
if err != nil {
return nil, err
}
return struct{}{}, nil
})
futures = append(futures, future)
if currentMemorySize <= t.getMemoryBufferLowWatermark() {
log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getBufferTotalUsedMemorySize()))
break
}
}
if err := conc.AwaitAll(futures...); err != nil {
return err
}
log.Info("flushLargestBuffers end", zap.Int64("currentMemorySize", currentMemorySize))
return nil
}
func (t *clusteringCompactionTask) flushAll() error {
futures := make([]*conc.Future[any], 0)
for _, buffer := range t.clusterBuffers {
b := buffer // avoid closure mis-capture
future := t.flushPool.Submit(func() (any, error) {
err := b.Close()
if err != nil {
return nil, err
}
return struct{}{}, nil
})
futures = append(futures, future)
}
if err := conc.AwaitAll(futures...); err != nil {
return err
}
return nil
}
func (t *clusteringCompactionTask) uploadPartitionStats(ctx context.Context, collectionID, partitionID typeutil.UniqueID, partitionStats *storage.PartitionStatsSnapshot) error {
// use planID as partitionStats version
version := t.plan.PlanID
partitionStats.Version = version
partitionStatsBytes, err := storage.SerializePartitionStatsSnapshot(partitionStats)
if err != nil {
return err
}
rootPath := strings.Split(t.plan.AnalyzeResultPath, common.AnalyzeStatsPath)[0]
newStatsPath := path.Join(rootPath, common.PartitionStatsPath, metautil.JoinIDPath(collectionID, partitionID), t.plan.GetChannel(), strconv.FormatInt(version, 10))
kv := map[string][]byte{
newStatsPath: partitionStatsBytes,
}
err = t.binlogIO.Upload(ctx, kv)
if err != nil {
return err
}
log.Info("Finish upload PartitionStats file", zap.String("key", newStatsPath), zap.Int("length", len(partitionStatsBytes)))
return nil
}
// cleanUp try best to clean all temp datas
func (t *clusteringCompactionTask) cleanUp(ctx context.Context) {
if t.mappingPool != nil {
t.mappingPool.Release()
}
if t.flushPool != nil {
t.flushPool.Release()
}
}
func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[interface{}]int64, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyze-%d", t.GetPlanID()))
defer span.End()
inputSegments := t.plan.GetSegmentBinlogs()
futures := make([]*conc.Future[any], 0, len(inputSegments))
analyzeStart := time.Now()
var mutex sync.Mutex
analyzeDict := make(map[interface{}]int64, 0)
for _, segment := range inputSegments {
segmentClone := proto.Clone(segment).(*datapb.CompactionSegmentBinlogs)
future := t.mappingPool.Submit(func() (any, error) {
analyzeResult, err := t.scalarAnalyzeSegment(ctx, segmentClone)
mutex.Lock()
defer mutex.Unlock()
for key, v := range analyzeResult {
if _, exist := analyzeDict[key]; exist {
analyzeDict[key] = analyzeDict[key] + v
} else {
analyzeDict[key] = v
}
}
return struct{}{}, err
})
futures = append(futures, future)
}
if err := conc.AwaitAll(futures...); err != nil {
return nil, err
}
log.Info("analyze end",
zap.Int64("collectionID", t.GetCollection()),
zap.Int64("partitionID", t.partitionID),
zap.Int("segments", len(inputSegments)),
zap.Int("clustering num", len(analyzeDict)),
zap.Duration("elapse", time.Since(analyzeStart)))
return analyzeDict, nil
}
func (t *clusteringCompactionTask) scalarAnalyzeSegment(
ctx context.Context,
segment *datapb.CompactionSegmentBinlogs,
) (map[interface{}]int64, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyzeSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID()))
defer span.End()
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("segmentID", segment.GetSegmentID()))
processStart := time.Now()
// Get the number of field binlog files from non-empty segment
var binlogNum int
for _, b := range segment.GetFieldBinlogs() {
if b != nil {
binlogNum = len(b.GetBinlogs())
break
}
}
// Unable to deal with all empty segments cases, so return error
if binlogNum == 0 {
log.Warn("compact wrong, all segments' binlogs are empty")
return nil, merr.WrapErrIllegalCompactionPlan("all segments' binlogs are empty")
}
log.Debug("binlogNum", zap.Int("binlogNum", binlogNum))
expiredFilter := compaction.NewEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime)
binlogs := make([]*datapb.FieldBinlog, 0)
requiredFields := typeutil.NewSet[int64]()
requiredFields.Insert(0, 1, t.primaryKeyField.GetFieldID(), t.clusteringKeyField.GetFieldID())
selectedFields := lo.Filter(t.plan.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) bool {
return requiredFields.Contain(field.GetFieldID())
})
switch segment.GetStorageVersion() {
case storage.StorageV1:
for _, fieldBinlog := range segment.GetFieldBinlogs() {
if requiredFields.Contain(fieldBinlog.GetFieldID()) {
binlogs = append(binlogs, fieldBinlog)
}
}
case storage.StorageV2:
binlogs = segment.GetFieldBinlogs()
default:
log.Warn("unsupported storage version", zap.Int64("storage version", segment.GetStorageVersion()))
return nil, fmt.Errorf("unsupported storage version %d", segment.GetStorageVersion())
}
rr, err := storage.NewBinlogRecordReader(ctx,
binlogs,
t.plan.GetSchema(),
storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
return t.binlogIO.Download(ctx, paths)
}),
storage.WithVersion(segment.StorageVersion),
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig),
storage.WithNeededFields(requiredFields),
)
if err != nil {
log.Warn("new binlog record reader wrong", zap.Error(err))
return make(map[interface{}]int64), err
}
pkIter := storage.NewDeserializeReader(rr, func(r storage.Record, v []*storage.Value) error {
return storage.ValueDeserializer(r, v, selectedFields)
})
defer pkIter.Close()
analyzeResult, remained, err := t.iterAndGetScalarAnalyzeResult(pkIter, expiredFilter)
if err != nil {
return nil, err
}
log.Info("analyze segment end",
zap.Int64("remained entities", remained),
zap.Int("expired entities", expiredFilter.GetExpiredCount()),
zap.Duration("map elapse", time.Since(processStart)))
return analyzeResult, nil
}
func (t *clusteringCompactionTask) iterAndGetScalarAnalyzeResult(pkIter *storage.DeserializeReaderImpl[*storage.Value], expiredFilter compaction.EntityFilter) (map[interface{}]int64, int64, error) {
// initial timestampFrom, timestampTo = -1, -1 is an illegal value, only to mark initial state
var (
remained int64 = 0
analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0)
)
for {
v, err := pkIter.NextValue()
if err != nil {
if err == sio.EOF {
pkIter.Close()
break
} else {
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
return nil, 0, err
}
}
// Filtering expired entity
if expiredFilter.Filtered((*v).PK.GetValue(), uint64((*v).Timestamp)) {
continue
}
// rowValue := vIter.GetData().(*iterators.InsertRow).GetValue()
row, ok := (*v).Value.(map[typeutil.UniqueID]interface{})
if !ok {
return nil, 0, errors.New("unexpected error")
}
key := row[t.clusteringKeyField.GetFieldID()]
if _, exist := analyzeResult[key]; exist {
analyzeResult[key] = analyzeResult[key] + 1
} else {
analyzeResult[key] = 1
}
remained++
}
return analyzeResult, remained, nil
}
func (t *clusteringCompactionTask) generatedScalarPlan(maxRows, preferRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {
buckets := make([][]interface{}, 0)
currentBucket := make([]interface{}, 0)
var currentBucketSize int64 = 0
for _, key := range keys {
// todo can optimize
if dict[key] > preferRows {
if len(currentBucket) != 0 {
buckets = append(buckets, currentBucket)
currentBucket = make([]interface{}, 0)
currentBucketSize = 0
}
buckets = append(buckets, []interface{}{key})
} else if currentBucketSize+dict[key] > maxRows {
buckets = append(buckets, currentBucket)
currentBucket = []interface{}{key}
currentBucketSize = dict[key]
} else if currentBucketSize+dict[key] > preferRows {
currentBucket = append(currentBucket, key)
buckets = append(buckets, currentBucket)
currentBucket = make([]interface{}, 0)
currentBucketSize = 0
} else {
currentBucket = append(currentBucket, key)
currentBucketSize += dict[key]
}
}
buckets = append(buckets, currentBucket)
return buckets
}
func (t *clusteringCompactionTask) switchPolicyForScalarPlan(totalRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {
bufferNumBySegmentMaxRows := totalRows / t.plan.MaxSegmentRows
bufferNumByMemory := t.memoryLimit / expectedBinlogSize
log.Info("switchPolicyForScalarPlan", zap.Int64("totalRows", totalRows),
zap.Int64("bufferNumBySegmentMaxRows", bufferNumBySegmentMaxRows),
zap.Int64("bufferNumByMemory", bufferNumByMemory))
if bufferNumByMemory > bufferNumBySegmentMaxRows {
return t.generatedScalarPlan(t.plan.GetMaxSegmentRows(), t.plan.GetPreferSegmentRows(), keys, dict)
}
maxRows := totalRows / bufferNumByMemory
return t.generatedScalarPlan(maxRows, int64(float64(maxRows)*t.compactionParams.PreferSegmentSizeRatio), keys, dict)
}
func (t *clusteringCompactionTask) splitClusterByScalarValue(dict map[interface{}]int64) ([][]interface{}, bool) {
totalRows := int64(0)
keys := lo.MapToSlice(dict, func(k interface{}, v int64) interface{} {
totalRows += v
return k
})
notNullKeys := lo.Filter(keys, func(i interface{}, j int) bool {
return i != nil
})
sort.Slice(notNullKeys, func(i, j int) bool {
return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, notNullKeys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, notNullKeys[j]))
})
return t.switchPolicyForScalarPlan(totalRows, notNullKeys, dict), len(keys) > len(notNullKeys)
}
func (t *clusteringCompactionTask) GetSlotUsage() int64 {
return t.plan.GetSlotUsage()
}