From df4285c9ef880064955e69d62e816e33b7fe09fc Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Thu, 13 Mar 2025 14:12:06 +0800 Subject: [PATCH] enhance: API integration with storage v2 in clustering-compactions (#40133) See #39173 --------- Signed-off-by: Ted Xu --- internal/compaction/common.go | 7 +- .../datacoord/compaction_task_clustering.go | 2 +- internal/datacoord/compaction_trigger_v2.go | 1 + internal/datacoord/task_scheduler_test.go | 2 +- .../compactor/clustering_compactor.go | 800 ++++-------------- .../compactor/clustering_compactor_test.go | 187 +--- .../datanode/compactor/compactor_common.go | 133 +++ internal/datanode/compactor/l0_compactor.go | 5 +- internal/datanode/compactor/merge_sort.go | 3 +- internal/datanode/compactor/mix_compactor.go | 16 +- internal/datanode/compactor/segment_writer.go | 114 +-- .../flushcommon/syncmgr/storage_serializer.go | 2 +- .../querynodev2/segments/segment_loader.go | 5 +- internal/storage/rw_test.go | 12 +- internal/storage/serde.go | 81 +- internal/storage/serde_events.go | 94 +- internal/storage/serde_events_test.go | 133 +-- internal/storage/serde_events_v2.go | 18 +- internal/storage/serde_events_v2_test.go | 14 +- internal/storage/serde_test.go | 5 +- internal/storage/sort.go | 4 +- internal/storage/stats.go | 2 +- internal/storagev2/packed/packed_reader.go | 4 + .../util/importutilv2/binlog/l0_reader.go | 5 +- .../clustering_compaction_null_data_test.go | 1 - 25 files changed, 603 insertions(+), 1047 deletions(-) diff --git a/internal/compaction/common.go b/internal/compaction/common.go index 3cea15cd95..a2310bfb04 100644 --- a/internal/compaction/common.go +++ b/internal/compaction/common.go @@ -57,7 +57,7 @@ func ComposeDeleteFromDeltalogs(ctx context.Context, io io.BinlogIO, paths []str defer reader.Close() for { - err := reader.Next() + dl, err := reader.NextValue() if err != nil { if err == sio.EOF { break @@ -66,11 +66,10 @@ func ComposeDeleteFromDeltalogs(ctx context.Context, io io.BinlogIO, paths []str return nil, err } - dl := reader.Value() - if ts, ok := pk2Ts[dl.Pk.GetValue()]; ok && ts > dl.Ts { + if ts, ok := pk2Ts[(*dl).Pk.GetValue()]; ok && ts > (*dl).Ts { continue } - pk2Ts[dl.Pk.GetValue()] = dl.Ts + pk2Ts[(*dl).Pk.GetValue()] = (*dl).Ts } log.Info("compose delete end", zap.Int("delete entries counts", len(pk2Ts))) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index e272cc3c65..237c71be4a 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -106,7 +106,6 @@ func (t *clusteringCompactionTask) Process() bool { if currentState != lastState { ts := time.Now().Unix() lastStateDuration := ts - t.GetTaskProto().GetLastStateStartTime() - log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration)) metrics.DataCoordCompactionLatency. WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState). Observe(float64(lastStateDuration * 1000)) @@ -208,6 +207,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP BeginLogID: beginLogID, PreAllocatedSegmentIDs: taskProto.GetPreAllocatedSegmentIDs(), SlotUsage: t.GetSlotUsage(), + MaxSize: taskProto.GetMaxSize(), } log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index ea656c5a1e..984d7ef3a4 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -504,6 +504,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C Begin: start, End: end, }, + MaxSize: expectedSegmentSize, } err = m.compactionHandler.enqueueCompaction(task) if err != nil { diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index f28199dd50..900ff5eae4 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -24,12 +24,12 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/hashicorp/golang-lru/v2/expirable" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "go.uber.org/zap" "google.golang.org/grpc" - "github.com/hashicorp/golang-lru/v2/expirable" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/session" diff --git a/internal/datanode/compactor/clustering_compactor.go b/internal/datanode/compactor/clustering_compactor.go index 32f3278684..c64d5d8a80 100644 --- a/internal/datanode/compactor/clustering_compactor.go +++ b/internal/datanode/compactor/clustering_compactor.go @@ -22,8 +22,6 @@ import ( sio "io" "math" "path" - "runtime" - "runtime/debug" "sort" "strconv" "strings" @@ -48,11 +46,9 @@ import ( "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/clusteringpb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" - "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" "github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/hardware" - "github.com/milvus-io/milvus/pkg/v2/util/lock" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -81,14 +77,10 @@ type clusteringCompactionTask struct { plan *datapb.CompactionPlan // flush - flushMutex sync.Mutex flushCount *atomic.Int64 - flushChan chan FlushSignal - doneChan chan struct{} // metrics, don't use writtenRowNum *atomic.Int64 - hasSignal *atomic.Bool // inner field collectionID int64 @@ -98,9 +90,8 @@ type clusteringCompactionTask struct { clusteringKeyField *schemapb.FieldSchema primaryKeyField *schemapb.FieldSchema - memoryBufferSize int64 - clusterBuffers []*ClusterBuffer - clusterBufferLocks *lock.KeyLock[int] + memoryBufferSize int64 + clusterBuffers []*ClusterBuffer // scalar keyToBufferFunc func(interface{}) *ClusterBuffer // vector @@ -111,31 +102,50 @@ type clusteringCompactionTask struct { } type ClusterBuffer struct { - id int - - writer atomic.Value - flushLock lock.RWMutex - - bufferMemorySize atomic.Int64 - - flushedRowNum map[typeutil.UniqueID]atomic.Int64 - currentSegmentRowNum atomic.Int64 - // segID -> fieldID -> binlogs - flushedBinlogs map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog - // segID -> fieldID -> binlogs - flushedBM25stats map[typeutil.UniqueID]map[int64]*storage.BM25Stats - - uploadedSegments []*datapb.CompactionSegment - uploadedSegmentStats map[typeutil.UniqueID]storage.SegmentStats - + id int + writer *MultiSegmentWriter clusteringKeyFieldStats *storage.FieldStats + + lock sync.RWMutex } -type FlushSignal struct { - writer *SegmentWriter - pack bool - id int - done bool +func (b *ClusterBuffer) Write(v *storage.Value) error { + b.lock.Lock() + defer b.lock.Unlock() + return b.writer.WriteValue(v) +} + +func (b *ClusterBuffer) Flush() error { + b.lock.Lock() + defer b.lock.Unlock() + return b.writer.FlushChunk() +} + +func (b *ClusterBuffer) Close() error { + b.lock.Lock() + defer b.lock.Unlock() + return b.writer.Close() +} + +func (b *ClusterBuffer) GetCompactionSegments() []*datapb.CompactionSegment { + b.lock.RLock() + defer b.lock.RUnlock() + return b.writer.GetCompactionSegments() +} + +func (b *ClusterBuffer) GetBufferSize() uint64 { + b.lock.RLock() + defer b.lock.RUnlock() + return b.writer.GetBufferUncompressed() +} + +func newClusterBuffer(id int, writer *MultiSegmentWriter, clusteringKeyFieldStats *storage.FieldStats) *ClusterBuffer { + return &ClusterBuffer{ + id: id, + writer: writer, + clusteringKeyFieldStats: clusteringKeyFieldStats, + lock: sync.RWMutex{}, + } } func NewClusteringCompactionTask( @@ -145,19 +155,15 @@ func NewClusteringCompactionTask( ) *clusteringCompactionTask { ctx, cancel := context.WithCancel(ctx) return &clusteringCompactionTask{ - ctx: ctx, - cancel: cancel, - binlogIO: binlogIO, - plan: plan, - tr: timerecord.NewTimeRecorder("clustering_compaction"), - done: make(chan struct{}, 1), - flushChan: make(chan FlushSignal, 100), - doneChan: make(chan struct{}), - clusterBuffers: make([]*ClusterBuffer, 0), - clusterBufferLocks: lock.NewKeyLock[int](), - flushCount: atomic.NewInt64(0), - writtenRowNum: atomic.NewInt64(0), - hasSignal: atomic.NewBool(false), + ctx: ctx, + cancel: cancel, + binlogIO: binlogIO, + plan: plan, + tr: timerecord.NewTimeRecorder("clustering_compaction"), + done: make(chan struct{}, 1), + clusterBuffers: make([]*ClusterBuffer, 0), + flushCount: atomic.NewInt64(0), + writtenRowNum: atomic.NewInt64(0), } } @@ -318,18 +324,11 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e for _, key := range bucket { fieldStats.UpdateMinMax(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, key)) } - buffer := &ClusterBuffer{ - id: id, - flushedRowNum: map[typeutil.UniqueID]atomic.Int64{}, - flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0), - flushedBM25stats: make(map[int64]map[int64]*storage.BM25Stats, 0), - uploadedSegments: make([]*datapb.CompactionSegment, 0), - uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), - clusteringKeyFieldStats: fieldStats, - } - if _, err = t.refreshBufferWriterWithPack(buffer); err != nil { - return err - } + + alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) + writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100) + + buffer := newClusterBuffer(id, writer, fieldStats) t.clusterBuffers = append(t.clusterBuffers, buffer) for _, key := range bucket { scalarToClusterBufferMap[key] = buffer @@ -341,17 +340,11 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e if err != nil { return err } - nullBuffer = &ClusterBuffer{ - id: len(buckets), - flushedRowNum: map[typeutil.UniqueID]atomic.Int64{}, - flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0), - uploadedSegments: make([]*datapb.CompactionSegment, 0), - uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), - clusteringKeyFieldStats: fieldStats, // null stats - } - if _, err = t.refreshBufferWriterWithPack(nullBuffer); err != nil { - return err - } + + alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) + writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100) + + nullBuffer = newClusterBuffer(len(buckets), writer, fieldStats) t.clusterBuffers = append(t.clusterBuffers, nullBuffer) } t.keyToBufferFunc = func(key interface{}) *ClusterBuffer { @@ -382,7 +375,7 @@ func splitCentroids(centroids []int, num int) ([][]int, map[int]int) { return result, resultIndex } -func (t *clusteringCompactionTask) generatedVectorPlan(bufferNum int, centroids []*schemapb.VectorField) error { +func (t *clusteringCompactionTask) generatedVectorPlan(ctx context.Context, bufferNum int, centroids []*schemapb.VectorField) error { centroidsOffset := make([]int, len(centroids)) for i := 0; i < len(centroids); i++ { centroidsOffset[i] = i @@ -400,18 +393,12 @@ func (t *clusteringCompactionTask) generatedVectorPlan(bufferNum int, centroids } fieldStats.SetVectorCentroids(centroidValues...) - clusterBuffer := &ClusterBuffer{ - id: id, - flushedRowNum: map[typeutil.UniqueID]atomic.Int64{}, - flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0), - uploadedSegments: make([]*datapb.CompactionSegment, 0), - uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), - clusteringKeyFieldStats: fieldStats, - } - if _, err = t.refreshBufferWriterWithPack(clusterBuffer); err != nil { - return err - } - t.clusterBuffers = append(t.clusterBuffers, clusterBuffer) + + alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) + writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100) + + buffer := newClusterBuffer(id, writer, fieldStats) + t.clusterBuffers = append(t.clusterBuffers, buffer) } t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer { centroidGroupOffset := groupIndex[int(idMapping[offset])] @@ -420,13 +407,13 @@ func (t *clusteringCompactionTask) generatedVectorPlan(bufferNum int, centroids return nil } -func (t *clusteringCompactionTask) switchPolicyForVectorPlan(centroids *clusteringpb.ClusteringCentroidsStats) error { +func (t *clusteringCompactionTask) switchPolicyForVectorPlan(ctx context.Context, centroids *clusteringpb.ClusteringCentroidsStats) error { bufferNum := len(centroids.GetCentroids()) bufferNumByMemory := int(t.memoryBufferSize / expectedBinlogSize) if bufferNumByMemory < bufferNum { bufferNum = bufferNumByMemory } - return t.generatedVectorPlan(bufferNum, centroids.GetCentroids()) + return t.generatedVectorPlan(ctx, bufferNum, centroids.GetCentroids()) } func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) error { @@ -455,7 +442,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e zap.Int("centroidNum", len(centroids.GetCentroids())), zap.Any("offsetMappingFiles", t.segmentIDOffsetMapping)) - return t.switchPolicyForVectorPlan(centroids) + return t.switchPolicyForVectorPlan(ctx, centroids) } // mapping read and split input segments into buffers @@ -467,9 +454,6 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, mapStart := time.Now() log := log.Ctx(ctx) - // start flush goroutine - go t.backgroundFlush(ctx) - futures := make([]*conc.Future[any], 0, len(inputSegments)) for _, segment := range inputSegments { segmentClone := &datapb.CompactionSegmentBinlogs{ @@ -488,45 +472,28 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return nil, nil, err } - t.flushChan <- FlushSignal{ - done: true, - } - - // block util all writer flushed. - <-t.doneChan - // force flush all buffers - err := t.flushAll(ctx) + err := t.flushAll() if err != nil { return nil, nil, err } - if err := t.checkBuffersAfterCompaction(); err != nil { - return nil, nil, err - } - resultSegments := make([]*datapb.CompactionSegment, 0) resultPartitionStats := &storage.PartitionStatsSnapshot{ SegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats), } for _, buffer := range t.clusterBuffers { - for _, seg := range buffer.uploadedSegments { - se := &datapb.CompactionSegment{ - PlanID: seg.GetPlanID(), - SegmentID: seg.GetSegmentID(), - NumOfRows: seg.GetNumOfRows(), - InsertLogs: seg.GetInsertLogs(), - Field2StatslogPaths: seg.GetField2StatslogPaths(), - Deltalogs: seg.GetDeltalogs(), - Channel: seg.GetChannel(), - Bm25Logs: seg.GetBm25Logs(), + segments := buffer.GetCompactionSegments() + log.Debug("compaction segments", zap.Any("segments", segments)) + resultSegments = append(resultSegments, segments...) + + for _, segment := range segments { + segmentStats := storage.SegmentStats{ + FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()}, + NumRows: int(segment.NumOfRows), } - log.Debug("put segment into final compaction result", zap.String("segment", se.String())) - resultSegments = append(resultSegments, se) - } - for segID, segmentStat := range buffer.uploadedSegmentStats { - log.Debug("put segment into final partition stats", zap.Int64("segmentID", segID), zap.Any("stats", segmentStat)) - resultPartitionStats.SegmentStats[segID] = segmentStat + resultPartitionStats.SegmentStats[segment.SegmentID] = segmentStats + log.Debug("compaction segment partitioning stats", zap.Int64("segmentID", segment.SegmentID), zap.Any("stats", segmentStats)) } } @@ -543,9 +510,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 { var totalBufferSize int64 = 0 for _, buffer := range t.clusterBuffers { - t.clusterBufferLocks.Lock(buffer.id) - totalBufferSize = totalBufferSize + int64(buffer.writer.Load().(*SegmentWriter).WrittenMemorySize()) + buffer.bufferMemorySize.Load() - t.clusterBufferLocks.Unlock(buffer.id) + totalBufferSize = totalBufferSize + int64(buffer.GetBufferSize()) } return totalBufferSize } @@ -563,7 +528,6 @@ func (t *clusteringCompactionTask) mappingSegment( zap.Int64("segmentID", segment.GetSegmentID())) log.Info("mapping segment start") processStart := time.Now() - fieldBinlogPaths := make([][]string, 0) var remained int64 = 0 deltaPaths := make([]string, 0) @@ -604,133 +568,70 @@ func (t *clusteringCompactionTask) mappingSegment( log.Warn("compact wrong, all segments' binlogs are empty") return merr.WrapErrIllegalCompactionPlan() } - for idx := 0; idx < binlogNum; idx++ { - var ps []string - for _, f := range segment.GetFieldBinlogs() { - ps = append(ps, f.GetBinlogs()[idx].GetLogPath()) - } - fieldBinlogPaths = append(fieldBinlogPaths, ps) + + rr, err := storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { + return t.binlogIO.Download(ctx, paths) + })) + if err != nil { + log.Warn("new binlog record reader wrong", zap.Error(err)) + return err } - var offset int64 = -1 - for _, paths := range fieldBinlogPaths { - allValues, err := t.binlogIO.Download(ctx, paths) + reader := storage.NewDeserializeReader(rr, func(r storage.Record, v []*storage.Value) error { + return storage.ValueDeserializer(r, v, t.plan.Schema.Fields) + }) + defer reader.Close() + + offset := int64(-1) + for { + v, err := reader.NextValue() if err != nil { - log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) - return err + if err == sio.EOF { + reader.Close() + break + } else { + log.Warn("compact wrong, failed to iter through data", zap.Error(err)) + return err + } } - blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob { - return &storage.Blob{Key: paths[i], Value: v} - }) - pkIter, err := storage.NewBinlogDeserializeReader(t.plan.Schema, storage.MakeBlobsReader(blobs)) - if err != nil { - log.Warn("new insert binlogs Itr wrong", zap.Strings("paths", paths), zap.Error(err)) - return err + offset++ + + if entityFilter.Filtered((*v).PK.GetValue(), uint64((*v).Timestamp)) { + continue } - for { - err := pkIter.Next() - if err != nil { - if err == sio.EOF { - pkIter.Close() - break - } else { - log.Warn("compact wrong, failed to iter through data", zap.Error(err)) + row, ok := (*v).Value.(map[typeutil.UniqueID]interface{}) + if !ok { + log.Warn("convert interface to map wrong") + return errors.New("unexpected error") + } + + clusteringKey := row[t.clusteringKeyField.FieldID] + var clusterBuffer *ClusterBuffer + if t.isVectorClusteringKey { + clusterBuffer = t.offsetToBufferFunc(offset, mappingStats.GetCentroidIdMapping()) + } else { + clusterBuffer = t.keyToBufferFunc(clusteringKey) + } + if err := clusterBuffer.Write(*v); err != nil { + return err + } + t.writtenRowNum.Inc() + remained++ + + if (remained+1)%100 == 0 { + currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize() + if currentBufferTotalMemorySize > t.getMemoryBufferHighWatermark() { + // reach flushBinlog trigger threshold + log.Debug("largest buffer need to flush", + zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize)) + if err := t.flushLargestBuffers(ctx); err != nil { return err } } - v := pkIter.Value() - offset++ - - if entityFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) { - continue - } - - row, ok := v.Value.(map[typeutil.UniqueID]interface{}) - if !ok { - log.Warn("transfer interface to map wrong", zap.Strings("paths", paths)) - return errors.New("unexpected error") - } - - clusteringKey := row[t.clusteringKeyField.FieldID] - var clusterBuffer *ClusterBuffer - if t.isVectorClusteringKey { - clusterBuffer = t.offsetToBufferFunc(offset, mappingStats.GetCentroidIdMapping()) - } else { - clusterBuffer = t.keyToBufferFunc(clusteringKey) - } - err = t.writeToBuffer(ctx, clusterBuffer, v) - if err != nil { - return err - } - remained++ - - if (remained+1)%100 == 0 { - currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize() - if clusterBuffer.currentSegmentRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.Load().(*SegmentWriter).IsFull() { - // reach segment/binlog max size - flushWriterFunc := func() { - t.clusterBufferLocks.Lock(clusterBuffer.id) - currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load() - // double-check the condition is still met - writer := clusterBuffer.writer.Load().(*SegmentWriter) - if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || writer.IsFull() { - pack, _ := t.refreshBufferWriterWithPack(clusterBuffer) - log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id), - zap.Bool("pack", pack), - zap.Int64("current segment", writer.GetSegmentID()), - zap.Int64("current segment num rows", currentSegmentNumRows), - zap.Int64("writer num", writer.GetRowNum())) - - t.clusterBufferLocks.Unlock(clusterBuffer.id) - // release the lock before sending the signal, avoid long wait caused by a full channel. - t.flushChan <- FlushSignal{ - writer: writer, - pack: pack, - id: clusterBuffer.id, - } - return - } - // release the lock even if the conditions are no longer met. - t.clusterBufferLocks.Unlock(clusterBuffer.id) - } - flushWriterFunc() - } else if currentBufferTotalMemorySize > t.getMemoryBufferHighWatermark() && !t.hasSignal.Load() { - // reach flushBinlog trigger threshold - log.Debug("largest buffer need to flush", - zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize)) - t.flushChan <- FlushSignal{} - t.hasSignal.Store(true) - } - - // if the total buffer size is too large, block here, wait for memory release by flushBinlog - if t.getBufferTotalUsedMemorySize() > t.getMemoryBufferBlockFlushThreshold() { - log.Debug("memory is already above the block watermark, pause writing", - zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize)) - loop: - for { - select { - case <-ctx.Done(): - log.Warn("stop waiting for memory buffer release as context done") - return nil - case <-t.done: - log.Warn("stop waiting for memory buffer release as task chan done") - return nil - default: - // currentSize := t.getCurrentBufferWrittenMemorySize() - currentSize := t.getBufferTotalUsedMemorySize() - if currentSize < t.getMemoryBufferHighWatermark() { - log.Debug("memory is already below the high watermark, continue writing", - zap.Int64("currentSize", currentSize)) - break loop - } - time.Sleep(time.Millisecond * 200) - } - } - } - } } } + missing := entityFilter.GetMissingDeleteCount() log.Info("mapping segment end", @@ -747,24 +648,6 @@ func (t *clusteringCompactionTask) mappingSegment( return nil } -func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuffer *ClusterBuffer, value *storage.Value) error { - t.clusterBufferLocks.Lock(clusterBuffer.id) - defer t.clusterBufferLocks.Unlock(clusterBuffer.id) - // prepare - writer := clusterBuffer.writer.Load() - if writer == nil || writer.(*SegmentWriter) == nil { - log.Warn("unexpected behavior, please check", zap.Int("buffer id", clusterBuffer.id)) - return fmt.Errorf("unexpected behavior, please check buffer id: %d", clusterBuffer.id) - } - err := writer.(*SegmentWriter).Write(value) - if err != nil { - return err - } - t.writtenRowNum.Inc() - clusterBuffer.currentSegmentRowNum.Inc() - return nil -} - func (t *clusteringCompactionTask) getWorkerPoolSize() int { return int(math.Max(float64(paramtable.Get().DataNodeCfg.ClusteringCompactionWorkerPoolSize.GetAsInt()), 1.0)) } @@ -782,90 +665,38 @@ func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 { return int64(float64(t.memoryBufferSize) * 0.7) } -func (t *clusteringCompactionTask) getMemoryBufferBlockFlushThreshold() int64 { - return t.memoryBufferSize -} - -func (t *clusteringCompactionTask) backgroundFlush(ctx context.Context) { - for { - select { - case <-ctx.Done(): - log.Info("clustering compaction task context exit") - return - case <-t.done: - log.Info("clustering compaction task done") - return - case signal := <-t.flushChan: - var err error - if signal.done { - t.doneChan <- struct{}{} - } else if signal.writer == nil { - t.hasSignal.Store(false) - err = t.flushLargestBuffers(ctx) - } else { - future := t.flushPool.Submit(func() (any, error) { - err := t.flushBinlog(ctx, t.clusterBuffers[signal.id], signal.writer, signal.pack) - if err != nil { - return nil, err - } - return struct{}{}, nil - }) - err = conc.AwaitAll(future) - } - if err != nil { - log.Warn("fail to flushBinlog data", zap.Error(err)) - // todo handle error - } - } - } -} - func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) error { - // only one flushLargestBuffers or flushAll should do at the same time - getLock := t.flushMutex.TryLock() - if !getLock { - return nil - } - defer t.flushMutex.Unlock() currentMemorySize := t.getBufferTotalUsedMemorySize() if currentMemorySize <= t.getMemoryBufferLowWatermark() { - log.Info("memory low water mark", zap.Int64("memoryBufferSize", t.getBufferTotalUsedMemorySize())) + log.Info("memory low water mark", zap.Int64("memoryBufferSize", currentMemorySize)) return nil } _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "flushLargestBuffers") defer span.End() bufferIDs := make([]int, 0) - bufferRowNums := make([]int64, 0) + bufferSizes := make([]int64, 0) for _, buffer := range t.clusterBuffers { bufferIDs = append(bufferIDs, buffer.id) - t.clusterBufferLocks.RLock(buffer.id) - bufferRowNums = append(bufferRowNums, buffer.writer.Load().(*SegmentWriter).GetRowNum()) - t.clusterBufferLocks.RUnlock(buffer.id) + bufferSizes = append(bufferSizes, int64(buffer.GetBufferSize())) } sort.Slice(bufferIDs, func(i, j int) bool { - return bufferRowNums[bufferIDs[i]] > bufferRowNums[bufferIDs[j]] + return bufferSizes[bufferIDs[i]] > bufferSizes[bufferIDs[j]] }) log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs), zap.Int64("currentMemorySize", currentMemorySize)) futures := make([]*conc.Future[any], 0) for _, bufferId := range bufferIDs { - t.clusterBufferLocks.Lock(bufferId) buffer := t.clusterBuffers[bufferId] - writer := buffer.writer - currentMemorySize -= int64(writer.Load().(*SegmentWriter).WrittenMemorySize()) - if err := t.refreshBufferWriter(buffer); err != nil { - t.clusterBufferLocks.Unlock(bufferId) - return err - } - t.clusterBufferLocks.Unlock(bufferId) + size := buffer.GetBufferSize() + currentMemorySize -= int64(size) log.Info("currentMemorySize after flush buffer binlog", zap.Int64("currentMemorySize", currentMemorySize), zap.Int("bufferID", bufferId), - zap.Uint64("WrittenMemorySize()", writer.Load().(*SegmentWriter).WrittenMemorySize()), - zap.Int64("RowNum", writer.Load().(*SegmentWriter).GetRowNum())) + zap.Uint64("WrittenUncompressed", size)) + future := t.flushPool.Submit(func() (any, error) { - err := t.flushBinlog(ctx, buffer, writer.Load().(*SegmentWriter), false) + err := buffer.Flush() if err != nil { return nil, err } @@ -886,15 +717,12 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro return nil } -func (t *clusteringCompactionTask) flushAll(ctx context.Context) error { - // only one flushLargestBuffers or flushAll should do at the same time - t.flushMutex.Lock() - defer t.flushMutex.Unlock() +func (t *clusteringCompactionTask) flushAll() error { futures := make([]*conc.Future[any], 0) for _, buffer := range t.clusterBuffers { - buffer := buffer + b := buffer // avoid closure mis-capture future := t.flushPool.Submit(func() (any, error) { - err := t.flushBinlog(ctx, buffer, buffer.writer.Load().(*SegmentWriter), true) + err := b.Close() if err != nil { return nil, err } @@ -905,180 +733,6 @@ func (t *clusteringCompactionTask) flushAll(ctx context.Context) error { if err := conc.AwaitAll(futures...); err != nil { return err } - - return nil -} - -func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, segmentID int64) error { - if binlogs, ok := buffer.flushedBinlogs[segmentID]; !ok || len(binlogs) == 0 { - return nil - } - - log := log.Ctx(ctx) - binlogNum := 0 - numRows := buffer.flushedRowNum[segmentID] - insertLogs := make([]*datapb.FieldBinlog, 0) - for _, fieldBinlog := range buffer.flushedBinlogs[segmentID] { - insertLogs = append(insertLogs, fieldBinlog) - binlogNum = len(fieldBinlog.GetBinlogs()) - } - - fieldBinlogPaths := make([][]string, 0) - for idx := 0; idx < binlogNum; idx++ { - var ps []string - for _, fieldID := range []int64{t.primaryKeyField.GetFieldID(), common.RowIDField, common.TimeStampField} { - ps = append(ps, buffer.flushedBinlogs[segmentID][fieldID].GetBinlogs()[idx].GetLogPath()) - } - fieldBinlogPaths = append(fieldBinlogPaths, ps) - } - - statsLogs, err := t.generatePkStats(ctx, segmentID, numRows.Load(), fieldBinlogPaths) - if err != nil { - return err - } - - // pack current flushBinlog data into a segment - seg := &datapb.CompactionSegment{ - PlanID: t.plan.GetPlanID(), - SegmentID: segmentID, - NumOfRows: numRows.Load(), - InsertLogs: insertLogs, - Field2StatslogPaths: []*datapb.FieldBinlog{statsLogs}, - Channel: t.plan.GetChannel(), - } - - if len(t.bm25FieldIds) > 0 { - bm25Logs, err := t.generateBM25Stats(ctx, segmentID, buffer.flushedBM25stats[segmentID]) - if err != nil { - return err - } - seg.Bm25Logs = bm25Logs - } - - buffer.uploadedSegments = append(buffer.uploadedSegments, seg) - segmentStats := storage.SegmentStats{ - FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()}, - NumRows: int(numRows.Load()), - } - buffer.uploadedSegmentStats[segmentID] = segmentStats - - for _, binlog := range seg.InsertLogs { - log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), - zap.Int64("segID", segmentID), zap.String("binlog", binlog.String())) - } - for _, statsLog := range seg.Field2StatslogPaths { - log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), - zap.Int64("segID", segmentID), zap.String("binlog", statsLog.String())) - } - - log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), - zap.Int64("segID", seg.GetSegmentID()), - zap.Int64("row num", seg.GetNumOfRows())) - - // clear segment binlogs cache - delete(buffer.flushedBinlogs, segmentID) - - if len(t.bm25FieldIds) > 0 { - delete(buffer.flushedBM25stats, segmentID) - } - return nil -} - -func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter, pack bool) error { - segmentID := writer.GetSegmentID() - _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("flushBinlog-%d", segmentID)) - defer span.End() - if writer == nil { - log.Warn("buffer writer is nil, please check", zap.Int("buffer id", buffer.id)) - return fmt.Errorf("buffer: %d writer is nil, please check", buffer.id) - } - defer func() { - // set old writer nil - writer = nil - }() - buffer.flushLock.Lock() - defer buffer.flushLock.Unlock() - writtenMemorySize := int64(writer.WrittenMemorySize()) - writtenRowNum := writer.GetRowNum() - log := log.With(zap.Int("bufferID", buffer.id), - zap.Int64("segmentID", segmentID), - zap.Bool("pack", pack), - zap.Int64("writerRowNum", writtenRowNum), - zap.Int64("writtenMemorySize", writtenMemorySize), - zap.Int64("bufferMemorySize", buffer.bufferMemorySize.Load()), - ) - - log.Info("start flush binlog") - if writtenRowNum <= 0 { - log.Debug("writerRowNum is zero, skip flush") - if pack { - return t.packBufferToSegment(ctx, buffer, segmentID) - } - return nil - } - - start := time.Now() - kvs, partialBinlogs, err := serializeWrite(ctx, t.logIDAlloc, writer) - if err != nil { - log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) - return err - } - - if err := t.binlogIO.Upload(ctx, kvs); err != nil { - log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) - return err - } - - if info, ok := buffer.flushedBinlogs[segmentID]; !ok || info == nil { - buffer.flushedBinlogs[segmentID] = make(map[typeutil.UniqueID]*datapb.FieldBinlog) - } - - // if has bm25 failed, cache bm25 stats - if len(t.bm25FieldIds) > 0 { - statsMap, ok := buffer.flushedBM25stats[segmentID] - if !ok || statsMap == nil { - buffer.flushedBM25stats[segmentID] = make(map[int64]*storage.BM25Stats) - statsMap = buffer.flushedBM25stats[segmentID] - } - - for fieldID, newstats := range writer.GetBm25Stats() { - if stats, ok := statsMap[fieldID]; ok { - stats.Merge(newstats) - } else { - statsMap[fieldID] = newstats - } - } - } - - for fID, path := range partialBinlogs { - tmpBinlog, ok := buffer.flushedBinlogs[segmentID][fID] - if !ok { - tmpBinlog = path - } else { - tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) - } - buffer.flushedBinlogs[segmentID][fID] = tmpBinlog - } - - curSegFlushedRowNum := buffer.flushedRowNum[segmentID] - curSegFlushedRowNum.Add(writtenRowNum) - buffer.flushedRowNum[segmentID] = curSegFlushedRowNum - - // clean buffer with writer - buffer.bufferMemorySize.Sub(writtenMemorySize) - - t.flushCount.Inc() - if pack { - if err := t.packBufferToSegment(ctx, buffer, segmentID); err != nil { - return err - } - } - - writer = nil - runtime.GC() - debug.FreeOSMemory() - log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load()), - zap.Int64("cost", time.Since(start).Milliseconds())) return nil } @@ -1203,7 +857,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( } for { - err := pkIter.Next() + v, err := pkIter.NextValue() if err != nil { if err == sio.EOF { pkIter.Close() @@ -1213,22 +867,21 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( return nil, err } } - v := pkIter.Value() // Filtering expired entity - if expiredFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) { + if expiredFilter.Filtered((*v).PK.GetValue(), uint64((*v).Timestamp)) { continue } // Update timestampFrom, timestampTo - if v.Timestamp < timestampFrom || timestampFrom == -1 { - timestampFrom = v.Timestamp + if (*v).Timestamp < timestampFrom || timestampFrom == -1 { + timestampFrom = (*v).Timestamp } - if v.Timestamp > timestampTo || timestampFrom == -1 { - timestampTo = v.Timestamp + if (*v).Timestamp > timestampTo || timestampFrom == -1 { + timestampTo = (*v).Timestamp } // rowValue := vIter.GetData().(*iterators.InsertRow).GetValue() - row, ok := v.Value.(map[typeutil.UniqueID]interface{}) + row, ok := (*v).Value.(map[typeutil.UniqueID]interface{}) if !ok { log.Warn("transfer interface to map wrong", zap.Strings("path", paths)) return nil, errors.New("unexpected error") @@ -1312,147 +965,6 @@ func (t *clusteringCompactionTask) splitClusterByScalarValue(dict map[interface{ return t.switchPolicyForScalarPlan(totalRows, notNullKeys, dict), len(keys) > len(notNullKeys) } -func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBuffer) (bool, error) { - var segmentID int64 - var err error - var pack bool - if buffer.writer.Load() != nil && buffer.writer.Load().(*SegmentWriter) != nil { - segmentID = buffer.writer.Load().(*SegmentWriter).GetSegmentID() - buffer.bufferMemorySize.Add(int64(buffer.writer.Load().(*SegmentWriter).WrittenMemorySize())) - } - if buffer.writer.Load() == nil || buffer.currentSegmentRowNum.Load() > t.plan.GetMaxSegmentRows() { - pack = true - segmentID, err = t.segIDAlloc.AllocOne() - if err != nil { - return pack, err - } - buffer.currentSegmentRowNum.Store(0) - } - - writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, compactionBatchSize, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds) - if err != nil { - return pack, err - } - - buffer.writer.Store(writer) - return pack, nil -} - -func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) error { - var segmentID int64 - var err error - segmentID = buffer.writer.Load().(*SegmentWriter).GetSegmentID() - buffer.bufferMemorySize.Add(int64(buffer.writer.Load().(*SegmentWriter).WrittenMemorySize())) - - writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, compactionBatchSize, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds) - if err != nil { - return err - } - - buffer.writer.Store(writer) - return nil -} - func (t *clusteringCompactionTask) GetSlotUsage() int64 { return t.plan.GetSlotUsage() } - -func (t *clusteringCompactionTask) checkBuffersAfterCompaction() error { - log := log.Ctx(t.ctx) - for _, buffer := range t.clusterBuffers { - if len(buffer.flushedBinlogs) != 0 { - log.Warn("there are some binlogs have leaked, please check", zap.Int("buffer id", buffer.id), - zap.Int64s("leak segments", lo.Keys(buffer.flushedBinlogs))) - log.Debug("leak binlogs", zap.Any("buffer flushedBinlogs", buffer.flushedBinlogs)) - return fmt.Errorf("there are some binlogs have leaked") - } - } - return nil -} - -func (t *clusteringCompactionTask) generateBM25Stats(ctx context.Context, segmentID int64, statsMap map[int64]*storage.BM25Stats) ([]*datapb.FieldBinlog, error) { - binlogs := []*datapb.FieldBinlog{} - kvs := map[string][]byte{} - logID, _, err := t.logIDAlloc.Alloc(uint32(len(statsMap))) - if err != nil { - return nil, err - } - - for fieldID, stats := range statsMap { - key, _ := binlog.BuildLogPath(storage.BM25Binlog, t.collectionID, t.partitionID, segmentID, fieldID, logID) - bytes, err := stats.Serialize() - if err != nil { - log.Warn("failed to seralize bm25 stats", zap.Int64("collection", t.collectionID), - zap.Int64("partition", t.partitionID), zap.Int64("segment", segmentID), zap.Error(err)) - return nil, err - } - - kvs[key] = bytes - - binlogs = append(binlogs, &datapb.FieldBinlog{ - FieldID: fieldID, - Binlogs: []*datapb.Binlog{{ - LogSize: int64(len(bytes)), - MemorySize: int64(len(bytes)), - LogPath: key, - EntriesNum: stats.NumRow(), - }}, - }) - logID++ - } - - if err := t.binlogIO.Upload(ctx, kvs); err != nil { - log.Warn("failed to upload bm25 stats log", - zap.Int64("collection", t.collectionID), - zap.Int64("partition", t.partitionID), - zap.Int64("segment", segmentID), - zap.Error(err)) - return nil, err - } - return binlogs, nil -} - -func (t *clusteringCompactionTask) generatePkStats(ctx context.Context, segmentID int64, - numRows int64, binlogPaths [][]string, -) (*datapb.FieldBinlog, error) { - stats, err := storage.NewPrimaryKeyStats(t.primaryKeyField.GetFieldID(), int64(t.primaryKeyField.GetDataType()), numRows) - if err != nil { - return nil, err - } - - for _, path := range binlogPaths { - bytesArr, err := t.binlogIO.Download(ctx, path) - if err != nil { - log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err)) - return nil, err - } - blobs := make([]*storage.Blob, len(bytesArr)) - for i := range bytesArr { - blobs[i] = &storage.Blob{Value: bytesArr[i]} - } - - pkIter, err := storage.NewInsertBinlogIterator(blobs, t.primaryKeyField.GetFieldID(), t.primaryKeyField.GetDataType()) - if err != nil { - log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err)) - return nil, err - } - - for pkIter.HasNext() { - vIter, _ := pkIter.Next() - v, ok := vIter.(*storage.Value) - if !ok { - log.Warn("transfer interface to Value wrong", zap.Strings("path", path)) - return nil, errors.New("unexpected error") - } - stats.Update(v.PK) - } - } - - codec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: t.collectionID, Schema: t.plan.GetSchema()}) - sblob, err := codec.SerializePkStats(stats, numRows) - if err != nil { - return nil, err - } - - return uploadStatsBlobs(ctx, t.collectionID, t.partitionID, segmentID, t.primaryKeyField.GetFieldID(), numRows, t.binlogIO, t.logIDAlloc, sblob) -} diff --git a/internal/datanode/compactor/clustering_compactor_test.go b/internal/datanode/compactor/clustering_compactor_test.go index 6a27821b43..e4ded6ba86 100644 --- a/internal/datanode/compactor/clustering_compactor_test.go +++ b/internal/datanode/compactor/clustering_compactor_test.go @@ -18,7 +18,6 @@ package compactor import ( "context" - "fmt" "sync" "testing" "time" @@ -212,9 +211,10 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { s.task.plan.ClusteringKeyField = 100 s.task.plan.PreferSegmentRows = 2048 s.task.plan.MaxSegmentRows = 2048 + s.task.plan.MaxSize = 1024 * 1024 * 1024 // max segment size = 1GB, we won't touch this value s.task.plan.PreAllocatedSegmentIDs = &datapb.IDRange{ - Begin: time.Now().UnixMilli(), - End: time.Now().UnixMilli() + 1000, + Begin: 1, + End: 101, } // 8+8+8+4+7+4*4=51 @@ -295,9 +295,10 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit( s.task.plan.ClusteringKeyField = 100 s.task.plan.PreferSegmentRows = 3000 s.task.plan.MaxSegmentRows = 3000 + s.task.plan.MaxSize = 1024 * 1024 * 1024 // max segment size = 1GB, we won't touch this value s.task.plan.PreAllocatedSegmentIDs = &datapb.IDRange{ - Begin: time.Now().UnixMilli(), - End: time.Now().UnixMilli() + 1000, + Begin: 1, + End: 1000, } // 8+8+8+4+7+4*4=51 @@ -311,7 +312,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit( compactionResult, err := s.task.Compact() s.Require().NoError(err) s.Equal(2, len(s.task.clusterBuffers)) - s.Equal(4, len(compactionResult.GetSegments())) + s.Equal(2, len(compactionResult.GetSegments())) totalBinlogNum := 0 totalRowNum := int64(0) for _, fb := range compactionResult.GetSegments()[0].GetInsertLogs() { @@ -330,7 +331,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit( statsRowNum += b.GetEntriesNum() } } - s.Equal(3, totalBinlogNum/len(schema.GetFields())) + s.Equal(5, totalBinlogNum/len(schema.GetFields())) s.Equal(1, statsBinlogNum) s.Equal(totalRowNum, statsRowNum) } @@ -368,9 +369,10 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() { s.task.plan.ClusteringKeyField = 100 s.task.plan.PreferSegmentRows = 2048 s.task.plan.MaxSegmentRows = 2048 + s.task.plan.MaxSize = 1024 * 1024 * 1024 // 1GB s.task.plan.PreAllocatedSegmentIDs = &datapb.IDRange{ - Begin: time.Now().UnixMilli(), - End: time.Now().UnixMilli() + 1000, + Begin: 1, + End: 1000, } // 8 + 8 + 8 + 7 + 8 = 39 @@ -419,173 +421,6 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() { s.Equal(totalRowNum, bm25RowNum) } -func (s *ClusteringCompactionTaskSuite) TestCheckBuffersAfterCompaction() { - s.Run("no leak", func() { - task := &clusteringCompactionTask{clusterBuffers: []*ClusterBuffer{{}}} - - s.NoError(task.checkBuffersAfterCompaction()) - }) - - s.Run("leak binlog", func() { - task := &clusteringCompactionTask{ - clusterBuffers: []*ClusterBuffer{ - { - flushedBinlogs: map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog{ - 1: { - 101: { - FieldID: 101, - Binlogs: []*datapb.Binlog{{LogID: 1000}}, - }, - }, - }, - }, - }, - } - s.Error(task.checkBuffersAfterCompaction()) - }) -} - -func (s *ClusteringCompactionTaskSuite) TestGenerateBM25Stats() { - s.Run("normal case", func() { - segmentID := int64(1) - task := &clusteringCompactionTask{ - collectionID: 111, - partitionID: 222, - bm25FieldIds: []int64{102}, - logIDAlloc: s.mockAlloc, - binlogIO: s.mockBinlogIO, - } - - statsMap := make(map[int64]*storage.BM25Stats) - statsMap[102] = storage.NewBM25Stats() - statsMap[102].Append(map[uint32]float32{1: 1}) - - binlogs, err := task.generateBM25Stats(context.Background(), segmentID, statsMap) - s.NoError(err) - s.Equal(1, len(binlogs)) - s.Equal(1, len(binlogs[0].Binlogs)) - s.Equal(int64(102), binlogs[0].FieldID) - s.Equal(int64(1), binlogs[0].Binlogs[0].GetEntriesNum()) - }) - - s.Run("alloc ID failed", func() { - segmentID := int64(1) - mockAlloc := allocator.NewMockAllocator(s.T()) - mockAlloc.EXPECT().Alloc(mock.Anything).Return(0, 0, fmt.Errorf("mock error")).Once() - - task := &clusteringCompactionTask{ - collectionID: 111, - partitionID: 222, - bm25FieldIds: []int64{102}, - logIDAlloc: mockAlloc, - } - - statsMap := make(map[int64]*storage.BM25Stats) - statsMap[102] = storage.NewBM25Stats() - statsMap[102].Append(map[uint32]float32{1: 1}) - - _, err := task.generateBM25Stats(context.Background(), segmentID, statsMap) - s.Error(err) - }) - - s.Run("upload failed", func() { - segmentID := int64(1) - mockBinlogIO := mock_util.NewMockBinlogIO(s.T()) - mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")).Once() - - task := &clusteringCompactionTask{ - collectionID: 111, - partitionID: 222, - bm25FieldIds: []int64{102}, - logIDAlloc: s.mockAlloc, - binlogIO: mockBinlogIO, - } - - statsMap := make(map[int64]*storage.BM25Stats) - statsMap[102] = storage.NewBM25Stats() - statsMap[102].Append(map[uint32]float32{1: 1}) - - _, err := task.generateBM25Stats(context.Background(), segmentID, statsMap) - s.Error(err) - }) -} - -func (s *ClusteringCompactionTaskSuite) TestGeneratePkStats() { - pkField := &schemapb.FieldSchema{ - FieldID: 100, - Name: "pk", - IsPrimaryKey: true, - Description: "", - DataType: schemapb.DataType_Int64, - } - s.Run("num rows zero", func() { - task := &clusteringCompactionTask{ - primaryKeyField: pkField, - } - binlogs, err := task.generatePkStats(context.Background(), 1, 0, nil) - s.Error(err) - s.Nil(binlogs) - }) - - s.Run("download binlogs failed", func() { - s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")) - task := &clusteringCompactionTask{ - binlogIO: s.mockBinlogIO, - primaryKeyField: pkField, - } - binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}}) - s.Error(err) - s.Nil(binlogs) - }) - - s.Run("NewInsertBinlogIterator failed", func() { - s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{[]byte("mock")}, nil) - task := &clusteringCompactionTask{ - binlogIO: s.mockBinlogIO, - primaryKeyField: pkField, - } - binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}}) - s.Error(err) - s.Nil(binlogs) - }) - - s.Run("upload failed", func() { - schema := genCollectionSchema() - segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, SegmentID, PartitionID, CollectionID, []int64{}) - s.Require().NoError(err) - for i := 0; i < 2000; i++ { - v := storage.Value{ - PK: storage.NewInt64PrimaryKey(int64(i)), - Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)), - Value: genRow(int64(i)), - } - err = segWriter.Write(&v) - s.Require().NoError(err) - } - segWriter.FlushAndIsFull() - - kvs, _, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter) - s.NoError(err) - mockBinlogIO := mock_util.NewMockBinlogIO(s.T()) - mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil) - mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")) - task := &clusteringCompactionTask{ - collectionID: CollectionID, - partitionID: PartitionID, - plan: &datapb.CompactionPlan{ - Schema: genCollectionSchema(), - }, - binlogIO: mockBinlogIO, - primaryKeyField: pkField, - logIDAlloc: s.mockAlloc, - } - - binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}}) - s.Error(err) - s.Nil(binlogs) - }) -} - func genRow(magic int64) map[int64]interface{} { ts := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0) return map[int64]interface{}{ diff --git a/internal/datanode/compactor/compactor_common.go b/internal/datanode/compactor/compactor_common.go index 93d4cae3b7..515e0717a5 100644 --- a/internal/datanode/compactor/compactor_common.go +++ b/internal/datanode/compactor/compactor_common.go @@ -18,7 +18,9 @@ package compactor import ( "context" + sio "io" "strconv" + "time" "go.opentelemetry.io/otel" "go.uber.org/zap" @@ -29,11 +31,142 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) const compactionBatchSize = 100 +type EntityFilter struct { + deletedPkTs map[interface{}]typeutil.Timestamp // pk2ts + ttl int64 // nanoseconds + currentTime time.Time + + expiredCount int + deletedCount int +} + +func newEntityFilter(deletedPkTs map[interface{}]typeutil.Timestamp, ttl int64, currTime time.Time) *EntityFilter { + if deletedPkTs == nil { + deletedPkTs = make(map[interface{}]typeutil.Timestamp) + } + return &EntityFilter{ + deletedPkTs: deletedPkTs, + ttl: ttl, + currentTime: currTime, + } +} + +func (filter *EntityFilter) Filtered(pk any, ts typeutil.Timestamp) bool { + if filter.isEntityDeleted(pk, ts) { + filter.deletedCount++ + return true + } + + // Filtering expired entity + if filter.isEntityExpired(ts) { + filter.expiredCount++ + return true + } + return false +} + +func (filter *EntityFilter) GetExpiredCount() int { + return filter.expiredCount +} + +func (filter *EntityFilter) GetDeletedCount() int { + return filter.deletedCount +} + +func (filter *EntityFilter) GetDeltalogDeleteCount() int { + return len(filter.deletedPkTs) +} + +func (filter *EntityFilter) GetMissingDeleteCount() int { + diff := filter.GetDeltalogDeleteCount() - filter.GetDeletedCount() + if diff <= 0 { + diff = 0 + } + return diff +} + +func (filter *EntityFilter) isEntityDeleted(pk interface{}, pkTs typeutil.Timestamp) bool { + if deleteTs, ok := filter.deletedPkTs[pk]; ok { + // insert task and delete task has the same ts when upsert + // here should be < instead of <= + // to avoid the upsert data to be deleted after compact + if pkTs < deleteTs { + return true + } + } + return false +} + +func (filter *EntityFilter) isEntityExpired(entityTs typeutil.Timestamp) bool { + // entity expire is not enabled if duration <= 0 + if filter.ttl <= 0 { + return false + } + entityTime, _ := tsoutil.ParseTS(entityTs) + + // this dur can represents 292 million years before or after 1970, enough for milvus + // ttl calculation + dur := filter.currentTime.UnixMilli() - entityTime.UnixMilli() + + // filter.ttl is nanoseconds + return filter.ttl/int64(time.Millisecond) <= dur +} + +func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]typeutil.Timestamp, error) { + pk2Ts := make(map[interface{}]typeutil.Timestamp) + + log := log.Ctx(ctx) + if len(paths) == 0 { + log.Debug("compact with no deltalogs, skip merge deltalogs") + return pk2Ts, nil + } + + blobs := make([]*storage.Blob, 0) + binaries, err := io.Download(ctx, paths) + if err != nil { + log.Warn("compact wrong, fail to download deltalogs", + zap.Strings("path", paths), + zap.Error(err)) + return nil, err + } + + for i := range binaries { + blobs = append(blobs, &storage.Blob{Value: binaries[i]}) + } + reader, err := storage.CreateDeltalogReader(blobs) + if err != nil { + log.Error("malformed delta file", zap.Error(err)) + return nil, err + } + defer reader.Close() + + for { + dl, err := reader.NextValue() + if err != nil { + if err == sio.EOF { + break + } + log.Error("compact wrong, fail to read deltalogs", zap.Error(err)) + return nil, err + } + + if ts, ok := pk2Ts[(*dl).Pk.GetValue()]; ok && ts > (*dl).Ts { + continue + } + pk2Ts[(*dl).Pk.GetValue()] = (*dl).Ts + } + + log.Info("compact mergeDeltalogs end", zap.Int("delete entries counts", len(pk2Ts))) + + return pk2Ts, nil +} + func serializeWrite(ctx context.Context, allocator allocator.Interface, writer *SegmentWriter) (kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) { _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "serializeWrite") defer span.End() diff --git a/internal/datanode/compactor/l0_compactor.go b/internal/datanode/compactor/l0_compactor.go index 05942c2b0a..8d2144d99a 100644 --- a/internal/datanode/compactor/l0_compactor.go +++ b/internal/datanode/compactor/l0_compactor.go @@ -403,7 +403,7 @@ func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs []str dData := &storage.DeleteData{} for { - err := reader.Next() + dl, err := reader.NextValue() if err != nil { if err == sio.EOF { break @@ -412,8 +412,7 @@ func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs []str return nil, err } - dl := reader.Value() - dData.Append(dl.Pk, dl.Ts) + dData.Append((*dl).Pk, (*dl).Ts) } return dData, nil diff --git a/internal/datanode/compactor/merge_sort.go b/internal/datanode/compactor/merge_sort.go index 2591a9450b..b25756ed57 100644 --- a/internal/datanode/compactor/merge_sort.go +++ b/internal/datanode/compactor/merge_sort.go @@ -31,7 +31,6 @@ func mergeSortMultipleSegments(ctx context.Context, tr *timerecord.TimeRecorder, currentTime time.Time, collectionTtl int64, - bm25FieldIds []int64, ) ([]*datapb.CompactionSegment, error) { _ = tr.RecordSpan() @@ -43,7 +42,7 @@ func mergeSortMultipleSegments(ctx context.Context, segIDAlloc := allocator.NewLocalAllocator(plan.GetPreAllocatedSegmentIDs().GetBegin(), plan.GetPreAllocatedSegmentIDs().GetEnd()) logIDAlloc := allocator.NewLocalAllocator(plan.GetBeginLogID(), math.MaxInt64) compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc) - writer := NewMultiSegmentWriter(binlogIO, compAlloc, plan, maxRows, partitionID, collectionID, bm25FieldIds) + writer := NewMultiSegmentWriter(ctx, binlogIO, compAlloc, plan.GetMaxSize(), plan.GetSchema(), maxRows, partitionID, collectionID, plan.GetChannel(), 4096) pkField, err := typeutil.GetPrimaryFieldSchema(plan.GetSchema()) if err != nil { diff --git a/internal/datanode/compactor/mix_compactor.go b/internal/datanode/compactor/mix_compactor.go index e96c27070f..5a500ea5b6 100644 --- a/internal/datanode/compactor/mix_compactor.go +++ b/internal/datanode/compactor/mix_compactor.go @@ -143,7 +143,7 @@ func (t *mixCompactionTask) mergeSplit( segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd()) logIDAlloc := allocator.NewLocalAllocator(t.plan.GetBeginLogID(), math.MaxInt64) compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc) - mWriter := NewMultiSegmentWriter(t.binlogIO, compAlloc, t.plan, t.maxRows, t.partitionID, t.collectionID, t.bm25FieldIDs) + mWriter := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096) deletedRowCount := int64(0) expiredRowCount := int64(0) @@ -167,6 +167,18 @@ func (t *mixCompactionTask) mergeSplit( return nil, err } res := mWriter.GetCompactionSegments() + if len(res) == 0 { + // append an empty segment + id, err := segIDAlloc.AllocOne() + if err != nil { + return nil, err + } + res = append(res, &datapb.CompactionSegment{ + SegmentID: id, + NumOfRows: 0, + Channel: t.GetChannelName(), + }) + } totalElapse := t.tr.RecordSpan() log.Info("compact mergeSplit end", @@ -335,7 +347,7 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { if sortMergeAppicable { log.Info("compact by merge sort") res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO, - t.plan.GetSegmentBinlogs(), t.tr, t.currentTime, t.plan.GetCollectionTtl(), t.bm25FieldIDs) + t.plan.GetSegmentBinlogs(), t.tr, t.currentTime, t.plan.GetCollectionTtl()) if err != nil { log.Warn("compact wrong, fail to merge sort segments", zap.Error(err)) return nil, err diff --git a/internal/datanode/compactor/segment_writer.go b/internal/datanode/compactor/segment_writer.go index abfce54275..acf9b2ccb1 100644 --- a/internal/datanode/compactor/segment_writer.go +++ b/internal/datanode/compactor/segment_writer.go @@ -45,10 +45,11 @@ import ( // Not concurrent safe. type MultiSegmentWriter struct { + ctx context.Context binlogIO io.BinlogIO allocator *compactionAlloactor - writer storage.BinlogRecordWriter + writer *storage.BinlogValueWriter currentSegmentID typeutil.UniqueID maxRows int64 @@ -61,14 +62,12 @@ type MultiSegmentWriter struct { partitionID int64 collectionID int64 channel string + batchSize int res []*datapb.CompactionSegment // DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord - bm25Fields []int64 } -var _ storage.RecordWriter = &MultiSegmentWriter{} - type compactionAlloactor struct { segmentAlloc allocator.Interface logIDAlloc allocator.Interface @@ -85,27 +84,22 @@ func (alloc *compactionAlloactor) allocSegmentID() (typeutil.UniqueID, error) { return alloc.segmentAlloc.AllocOne() } -func (alloc *compactionAlloactor) getLogIDAllocator() allocator.Interface { - return alloc.logIDAlloc -} - -func NewMultiSegmentWriter(binlogIO io.BinlogIO, allocator *compactionAlloactor, plan *datapb.CompactionPlan, - maxRows int64, partitionID, collectionID int64, bm25Fields []int64, +func NewMultiSegmentWriter(ctx context.Context, binlogIO io.BinlogIO, allocator *compactionAlloactor, segmentSize int64, + schema *schemapb.CollectionSchema, + maxRows int64, partitionID, collectionID int64, channel string, batchSize int, ) *MultiSegmentWriter { return &MultiSegmentWriter{ - binlogIO: binlogIO, - allocator: allocator, - - maxRows: maxRows, // For bloomfilter only - segmentSize: plan.GetMaxSize(), - - schema: plan.GetSchema(), + ctx: ctx, + binlogIO: binlogIO, + allocator: allocator, + maxRows: maxRows, // For bloomfilter only + segmentSize: segmentSize, + schema: schema, partitionID: partitionID, collectionID: collectionID, - channel: plan.GetChannel(), - - res: make([]*datapb.CompactionSegment, 0), - bm25Fields: bm25Fields, + channel: channel, + batchSize: batchSize, + res: make([]*datapb.CompactionSegment, 0), } } @@ -128,10 +122,12 @@ func (w *MultiSegmentWriter) closeWriter() error { w.res = append(w.res, result) - log.Info("Segment writer flushed a segment", + log.Info("created new segment", zap.Int64("segmentID", w.currentSegmentID), zap.String("channel", w.channel), - zap.Int64("totalRows", w.writer.GetRowNum())) + zap.Int64("totalRows", w.writer.GetRowNum()), + zap.Uint64("totalSize", w.writer.GetWrittenUncompressed()), + zap.Int64("expected segment size", w.segmentSize)) } return nil } @@ -147,11 +143,10 @@ func (w *MultiSegmentWriter) rotateWriter() error { } w.currentSegmentID = newSegmentID - ctx := context.TODO() chunkSize := paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() rootPath := binlog.GetRootPath() - writer, err := storage.NewBinlogRecordWriter(ctx, w.collectionID, w.partitionID, newSegmentID, + rw, err := storage.NewBinlogRecordWriter(w.ctx, w.collectionID, w.partitionID, newSegmentID, w.schema, w.allocator.logIDAlloc, chunkSize, rootPath, w.maxRows, storage.WithUploader(func(ctx context.Context, kvs map[string][]byte) error { return w.binlogIO.Upload(ctx, kvs) @@ -159,7 +154,8 @@ func (w *MultiSegmentWriter) rotateWriter() error { if err != nil { return err } - w.writer = writer + + w.writer = storage.NewBinlogValueWriter(rw, w.batchSize) return nil } @@ -170,6 +166,13 @@ func (w *MultiSegmentWriter) GetWrittenUncompressed() uint64 { return w.writer.GetWrittenUncompressed() } +func (w *MultiSegmentWriter) GetBufferUncompressed() uint64 { + if w.writer == nil { + return 0 + } + return w.writer.GetBufferUncompressed() +} + func (w *MultiSegmentWriter) GetCompactionSegments() []*datapb.CompactionSegment { return w.res } @@ -184,23 +187,31 @@ func (w *MultiSegmentWriter) Write(r storage.Record) error { return w.writer.Write(r) } -// DONOT return an empty list if every insert of the segment is deleted, -// append an empty segment instead -func (w *MultiSegmentWriter) Close() error { - if w.writer == nil && len(w.res) == 0 { - // append an empty segment - id, err := w.allocator.segmentAlloc.AllocOne() - if err != nil { +func (w *MultiSegmentWriter) WriteValue(v *storage.Value) error { + if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) { + if err := w.rotateWriter(); err != nil { return err } - w.res = append(w.res, &datapb.CompactionSegment{ - SegmentID: id, - NumOfRows: 0, - Channel: w.channel, - }) + } + + return w.writer.WriteValue(v) +} + +func (w *MultiSegmentWriter) FlushChunk() error { + if w.writer == nil { return nil } - return w.closeWriter() + if err := w.writer.Flush(); err != nil { + return err + } + return w.writer.FlushChunk() +} + +func (w *MultiSegmentWriter) Close() error { + if w.writer != nil { + return w.closeWriter() + } + return nil } func NewSegmentDeltaWriter(segmentID, partitionID, collectionID int64) *SegmentDeltaWriter { @@ -276,7 +287,7 @@ func (w *SegmentDeltaWriter) Finish() (*storage.Blob, *writebuffer.TimeRange, er } type SegmentWriter struct { - writer *storage.SerializeWriter[*storage.Value] + writer *storage.BinlogSerializeWriter closers []func() (*storage.Blob, error) tsFrom typeutil.Timestamp tsTo typeutil.Timestamp @@ -316,7 +327,7 @@ func (w *SegmentWriter) GetPkID() int64 { } func (w *SegmentWriter) WrittenMemorySize() uint64 { - return w.writer.WrittenMemorySize() + return w.writer.GetWrittenUncompressed() } func (w *SegmentWriter) WriteRecord(r storage.Record) error { @@ -395,7 +406,7 @@ func (w *SegmentWriter) WriteRecord(r storage.Record) error { rec := storage.NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(rows)), field2Col) defer rec.Release() - return w.writer.WriteRecord(rec) + return w.writer.Write(rec) } func (w *SegmentWriter) Write(v *storage.Value) error { @@ -422,7 +433,7 @@ func (w *SegmentWriter) Write(v *storage.Value) error { } w.rowCount.Inc() - return w.writer.Write(v) + return w.writer.WriteValue(v) } func (w *SegmentWriter) Finish() (*storage.Blob, error) { @@ -454,25 +465,25 @@ func (w *SegmentWriter) GetBm25StatsBlob() (map[int64]*storage.Blob, error) { } func (w *SegmentWriter) IsFull() bool { - return w.writer.WrittenMemorySize() > w.maxBinlogSize + return w.writer.GetWrittenUncompressed() > w.maxBinlogSize } func (w *SegmentWriter) FlushAndIsFull() bool { w.writer.Flush() - return w.writer.WrittenMemorySize() > w.maxBinlogSize + return w.writer.GetWrittenUncompressed() > w.maxBinlogSize } func (w *SegmentWriter) IsFullWithBinlogMaxSize(binLogMaxSize uint64) bool { - return w.writer.WrittenMemorySize() > binLogMaxSize + return w.writer.GetWrittenUncompressed() > binLogMaxSize } func (w *SegmentWriter) IsEmpty() bool { - return w.writer.WrittenMemorySize() == 0 + return w.writer.GetWrittenUncompressed() == 0 } func (w *SegmentWriter) FlushAndIsEmpty() bool { w.writer.Flush() - return w.writer.WrittenMemorySize() == 0 + return w.writer.GetWrittenUncompressed() == 0 } func (w *SegmentWriter) GetTimeRange() *writebuffer.TimeRange { @@ -499,11 +510,11 @@ func (w *SegmentWriter) SerializeYield() ([]*storage.Blob, *writebuffer.TimeRang } func (w *SegmentWriter) GetTotalSize() int64 { - return w.syncedSize.Load() + int64(w.writer.WrittenMemorySize()) + return w.syncedSize.Load() + int64(w.writer.GetWrittenUncompressed()) } func (w *SegmentWriter) clear() { - w.syncedSize.Add(int64(w.writer.WrittenMemorySize())) + w.syncedSize.Add(int64(w.writer.GetWrittenUncompressed())) writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch, w.batchSize) w.writer = writer @@ -512,6 +523,7 @@ func (w *SegmentWriter) clear() { w.tsTo = 0 } +// deprecated: use NewMultiSegmentWriter instead func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, batchSize int, segID, partID, collID int64, Bm25Fields []int64) (*SegmentWriter, error) { writer, closers, err := newBinlogWriter(collID, partID, segID, sch, batchSize) if err != nil { @@ -555,7 +567,7 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, batchSize } func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema, batchSize int, -) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*storage.Blob, error), err error) { +) (writer *storage.BinlogSerializeWriter, closers []func() (*storage.Blob, error), err error) { fieldWriters := storage.NewBinlogStreamWriters(collID, partID, segID, schema.Fields) closers = make([]func() (*storage.Blob, error), 0, len(fieldWriters)) for _, w := range fieldWriters { diff --git a/internal/flushcommon/syncmgr/storage_serializer.go b/internal/flushcommon/syncmgr/storage_serializer.go index abcc2cdd26..7fdc9ed76b 100644 --- a/internal/flushcommon/syncmgr/storage_serializer.go +++ b/internal/flushcommon/syncmgr/storage_serializer.go @@ -197,7 +197,7 @@ func (s *storageV1Serializer) serializeDeltalog(pack *SyncPack) (*storage.Blob, for i := 0; i < len(pack.deltaData.Pks); i++ { deleteLog := storage.NewDeleteLog(pack.deltaData.Pks[i], pack.deltaData.Tss[i]) - err = writer.Write(deleteLog) + err = writer.WriteValue(deleteLog) if err != nil { return nil, err } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index b91e1ba158..d9a7427666 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -1246,15 +1246,14 @@ func (loader *segmentLoader) loadDeltalogs(ctx context.Context, segment Segment, } defer reader.Close() for { - err := reader.Next() + dl, err := reader.NextValue() if err != nil { if err == io.EOF { break } return err } - dl := reader.Value() - err = deltaData.Append(dl.Pk, dl.Ts) + err = deltaData.Append((*dl).Pk, (*dl).Ts) if err != nil { return err } diff --git a/internal/storage/rw_test.go b/internal/storage/rw_test.go index 585a6e73b0..5544032f00 100644 --- a/internal/storage/rw_test.go +++ b/internal/storage/rw_test.go @@ -138,11 +138,9 @@ func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() { defer reader.Close() for i := 1; i <= rows; i++ { - err = reader.Next() + value, err := reader.NextValue() s.NoError(err) - - value := reader.Value() - rec, err := ValueSerializer([]*Value{value}, s.schema.Fields) + rec, err := ValueSerializer([]*Value{*value}, s.schema.Fields) s.NoError(err) err = w.Write(rec) s.NoError(err) @@ -174,6 +172,7 @@ func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() { } r, err := NewBinlogRecordReader(s.ctx, binlogs, s.schema, rOption...) s.NoError(err) + defer r.Close() for i := 0; i < rows/readBatchSize+1; i++ { rec, err := r.Next() s.NoError(err) @@ -304,11 +303,10 @@ func (s *PackedBinlogRecordSuite) TestAllocIDExhausedError() { defer reader.Close() for i := 0; i < size; i++ { - err = reader.Next() + value, err := reader.NextValue() s.NoError(err) - value := reader.Value() - rec, err := ValueSerializer([]*Value{value}, s.schema.Fields) + rec, err := ValueSerializer([]*Value{*value}, s.schema.Fields) s.NoError(err) err = w.Write(rec) s.Error(err) diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 923150713c..f58093a623 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -21,14 +21,12 @@ import ( "io" "math" "strconv" - "sync" "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" "github.com/apache/arrow/go/v17/parquet" "github.com/apache/arrow/go/v17/parquet/compress" "github.com/apache/arrow/go/v17/parquet/pqarrow" - "github.com/cockroachdb/errors" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -519,7 +517,12 @@ func getFieldWriterProps(field *schemapb.FieldSchema) *parquet.WriterProperties ) } -type DeserializeReader[T any] struct { +type DeserializeReader[T any] interface { + NextValue() (*T, error) + Close() error +} + +type DeserializeReaderImpl[T any] struct { rr RecordReader deserializer Deserializer[T] rec Record @@ -528,11 +531,11 @@ type DeserializeReader[T any] struct { } // Iterate to next value, return error or EOF if no more value. -func (deser *DeserializeReader[T]) Next() error { +func (deser *DeserializeReaderImpl[T]) NextValue() (*T, error) { if deser.rec == nil || deser.pos >= deser.rec.Len()-1 { r, err := deser.rr.Next() if err != nil { - return err + return nil, err } deser.pos = 0 deser.rec = r @@ -540,33 +543,21 @@ func (deser *DeserializeReader[T]) Next() error { deser.values = make([]T, deser.rec.Len()) if err := deser.deserializer(deser.rec, deser.values); err != nil { - return err + return nil, err } } else { deser.pos++ } - return nil + return &deser.values[deser.pos], nil } -func (deser *DeserializeReader[T]) Value() T { - return deser.values[deser.pos] +func (deser *DeserializeReaderImpl[T]) Close() error { + return deser.rr.Close() } -func (deser *DeserializeReader[T]) Close() error { - if deser.rec != nil { - deser.rec.Release() - } - if deser.rr != nil { - if err := deser.rr.Close(); err != nil { - return err - } - } - return nil -} - -func NewDeserializeReader[T any](rr RecordReader, deserializer Deserializer[T]) *DeserializeReader[T] { - return &DeserializeReader[T]{ +func NewDeserializeReader[T any](rr RecordReader, deserializer Deserializer[T]) *DeserializeReaderImpl[T] { + return &DeserializeReaderImpl[T]{ rr: rr, deserializer: deserializer, } @@ -828,19 +819,22 @@ func newMultiFieldRecordWriter(fieldIds []FieldID, fields []arrow.Field, writer }, nil } -type SerializeWriter[T any] struct { +type SerializeWriter[T any] interface { + WriteValue(value T) error + Flush() error + Close() error +} + +type SerializeWriterImpl[T any] struct { rw RecordWriter serializer Serializer[T] batchSize int - mu sync.Mutex buffer []T pos int } -func (sw *SerializeWriter[T]) Flush() error { - sw.mu.Lock() - defer sw.mu.Unlock() +func (sw *SerializeWriterImpl[T]) Flush() error { if sw.pos == 0 { return nil } @@ -857,7 +851,7 @@ func (sw *SerializeWriter[T]) Flush() error { return nil } -func (sw *SerializeWriter[T]) Write(value T) error { +func (sw *SerializeWriterImpl[T]) WriteValue(value T) error { if sw.buffer == nil { sw.buffer = make([]T, sw.batchSize) } @@ -871,36 +865,15 @@ func (sw *SerializeWriter[T]) Write(value T) error { return nil } -func (sw *SerializeWriter[T]) WriteRecord(r Record) error { - sw.mu.Lock() - defer sw.mu.Unlock() - if len(sw.buffer) != 0 { - return errors.New("serialize buffer is not empty") - } - - if err := sw.rw.Write(r); err != nil { - return err - } - - return nil -} - -func (sw *SerializeWriter[T]) WrittenMemorySize() uint64 { - sw.mu.Lock() - defer sw.mu.Unlock() - return sw.rw.GetWrittenUncompressed() -} - -func (sw *SerializeWriter[T]) Close() error { +func (sw *SerializeWriterImpl[T]) Close() error { if err := sw.Flush(); err != nil { return err } - sw.rw.Close() - return nil + return sw.rw.Close() } -func NewSerializeRecordWriter[T any](rw RecordWriter, serializer Serializer[T], batchSize int) *SerializeWriter[T] { - return &SerializeWriter[T]{ +func NewSerializeRecordWriter[T any](rw RecordWriter, serializer Serializer[T], batchSize int) *SerializeWriterImpl[T] { + return &SerializeWriterImpl[T]{ rw: rw, serializer: serializer, batchSize: batchSize, diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index a125d5393f..e6c2798884 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -34,7 +34,6 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/pkg/v2/common" - "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" @@ -275,7 +274,7 @@ func ValueDeserializer(r Record, v []*Value, fieldSchema []*schemapb.FieldSchema return nil } -func NewBinlogDeserializeReader(schema *schemapb.CollectionSchema, blobsReader ChunkedBlobsReader) (*DeserializeReader[*Value], error) { +func NewBinlogDeserializeReader(schema *schemapb.CollectionSchema, blobsReader ChunkedBlobsReader) (*DeserializeReaderImpl[*Value], error) { reader, err := newCompositeBinlogRecordReader(schema, blobsReader) if err != nil { return nil, err @@ -286,7 +285,7 @@ func NewBinlogDeserializeReader(schema *schemapb.CollectionSchema, blobsReader C }), nil } -func newDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { +func newDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) { reader, err := newCompositeBinlogRecordReader(nil, MakeBlobsReader(blobs)) if err != nil { return nil, err @@ -445,6 +444,9 @@ type BinlogRecordWriter interface { bm25StatsLog map[FieldID]*datapb.FieldBinlog, ) GetRowNum() int64 + FlushChunk() error + GetBufferUncompressed() uint64 + Schema() *schemapb.CollectionSchema } type ChunkedBlobsWriter func([]*Blob) error @@ -474,6 +476,8 @@ type CompositeBinlogRecordWriter struct { fieldBinlogs map[FieldID]*datapb.FieldBinlog statsLog *datapb.FieldBinlog bm25StatsLog map[FieldID]*datapb.FieldBinlog + + flushedUncompressed uint64 } var _ BinlogRecordWriter = (*CompositeBinlogRecordWriter)(nil) @@ -527,7 +531,7 @@ func (c *CompositeBinlogRecordWriter) Write(r Record) error { // flush if size exceeds chunk size if c.rw.GetWrittenUncompressed() >= c.chunkSize { - return c.flushChunk() + return c.FlushChunk() } return nil @@ -565,18 +569,25 @@ func (c *CompositeBinlogRecordWriter) Close() error { } if c.rw != nil { // if rw is not nil, it means there is data to be flushed - if err := c.flushChunk(); err != nil { + if err := c.FlushChunk(); err != nil { return err } } return nil } -func (c *CompositeBinlogRecordWriter) GetWrittenUncompressed() uint64 { - return 0 +func (c *CompositeBinlogRecordWriter) GetBufferUncompressed() uint64 { + if c.rw == nil { + return 0 + } + return c.rw.GetWrittenUncompressed() } -func (c *CompositeBinlogRecordWriter) flushChunk() error { +func (c *CompositeBinlogRecordWriter) GetWrittenUncompressed() uint64 { + return c.flushedUncompressed + c.GetBufferUncompressed() +} + +func (c *CompositeBinlogRecordWriter) FlushChunk() error { if c.fieldWriters == nil { return nil } @@ -621,11 +632,17 @@ func (c *CompositeBinlogRecordWriter) flushChunk() error { }) } + c.flushedUncompressed += c.rw.GetWrittenUncompressed() + // reset writers c.resetWriters() return nil } +func (c *CompositeBinlogRecordWriter) Schema() *schemapb.CollectionSchema { + return c.schema +} + func (c *CompositeBinlogRecordWriter) writeStats() error { if c.pkstats == nil { return nil @@ -731,7 +748,6 @@ func newCompositeBinlogRecordWriter(collectionID, partitionID, segmentID UniqueI ) (*CompositeBinlogRecordWriter, error) { pkField, err := typeutil.GetPrimaryFieldSchema(schema) if err != nil { - log.Warn("failed to get pk field from schema") return nil, err } stats, err := NewPrimaryKeyStats(pkField.GetFieldID(), int64(pkField.GetDataType()), maxRowNum) @@ -764,21 +780,39 @@ func newCompositeBinlogRecordWriter(collectionID, partitionID, segmentID UniqueI }, nil } -func NewChunkedBinlogSerializeWriter(collectionID, partitionID, segmentID UniqueID, schema *schemapb.CollectionSchema, - blobsWriter ChunkedBlobsWriter, allocator allocator.Interface, chunkSize uint64, rootPath string, maxRowNum int64, batchSize int, -) (*SerializeWriter[*Value], error) { - rw, err := newCompositeBinlogRecordWriter(collectionID, partitionID, segmentID, schema, blobsWriter, allocator, chunkSize, rootPath, maxRowNum) - if err != nil { - return nil, err +// BinlogValueWriter is a BinlogRecordWriter with SerializeWriter[*Value] mixin. +type BinlogValueWriter struct { + BinlogRecordWriter + SerializeWriter[*Value] +} + +func (b *BinlogValueWriter) Close() error { + return b.SerializeWriter.Close() +} + +func NewBinlogValueWriter(rw BinlogRecordWriter, batchSize int, +) *BinlogValueWriter { + return &BinlogValueWriter{ + BinlogRecordWriter: rw, + SerializeWriter: NewSerializeRecordWriter[*Value](rw, func(v []*Value) (Record, error) { + return ValueSerializer(v, rw.Schema().Fields) + }, batchSize), } - return NewSerializeRecordWriter[*Value](rw, func(v []*Value) (Record, error) { - return ValueSerializer(v, schema.Fields) - }, batchSize), nil +} + +// deprecated, use NewBinlogValueWriter instead +type BinlogSerializeWriter struct { + RecordWriter + SerializeWriter[*Value] +} + +func (b *BinlogSerializeWriter) Close() error { + return b.SerializeWriter.Close() } func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, segmentID UniqueID, eventWriters map[FieldID]*BinlogStreamWriter, batchSize int, -) (*SerializeWriter[*Value], error) { +) (*BinlogSerializeWriter, error) { rws := make(map[FieldID]RecordWriter, len(eventWriters)) for fid := range eventWriters { w := eventWriters[fid] @@ -789,9 +823,12 @@ func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, se rws[fid] = rw } compositeRecordWriter := NewCompositeRecordWriter(rws) - return NewSerializeRecordWriter[*Value](compositeRecordWriter, func(v []*Value) (Record, error) { - return ValueSerializer(v, schema.Fields) - }, batchSize), nil + return &BinlogSerializeWriter{ + RecordWriter: compositeRecordWriter, + SerializeWriter: NewSerializeRecordWriter[*Value](compositeRecordWriter, func(v []*Value) (Record, error) { + return ValueSerializer(v, schema.Fields) + }, batchSize), + }, nil } type DeltalogStreamWriter struct { @@ -878,7 +915,7 @@ func newDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *Del } } -func newDeltalogSerializeWriter(eventWriter *DeltalogStreamWriter, batchSize int) (*SerializeWriter[*DeleteLog], error) { +func newDeltalogSerializeWriter(eventWriter *DeltalogStreamWriter, batchSize int) (*SerializeWriterImpl[*DeleteLog], error) { rws := make(map[FieldID]RecordWriter, 1) rw, err := eventWriter.GetRecordWriter() if err != nil { @@ -1101,7 +1138,7 @@ func (dsw *MultiFieldDeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) err return nil } -func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, batchSize int) (*SerializeWriter[*DeleteLog], error) { +func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, batchSize int) (*SerializeWriterImpl[*DeleteLog], error) { rw, err := eventWriter.GetRecordWriter() if err != nil { return nil, err @@ -1155,7 +1192,7 @@ func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, ba }, batchSize), nil } -func newDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { +func newDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) { reader, err := newSimpleArrowRecordReader(blobs) if err != nil { return nil, err @@ -1198,7 +1235,7 @@ func newDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], // NewDeltalogDeserializeReader is the entry point for the delta log reader. // It includes NewDeltalogOneFieldReader, which uses the existing log format with only one column in a log file, // and NewDeltalogMultiFieldReader, which uses the new format and supports multiple fields in a log file. -func newDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { +func newDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) { if supportMultiFieldFormat(blobs) { return newDeltalogMultiFieldReader(blobs) } @@ -1220,11 +1257,12 @@ func supportMultiFieldFormat(blobs []*Blob) bool { return false } -func CreateDeltalogReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { +func CreateDeltalogReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) { return newDeltalogDeserializeReader(blobs) } -func CreateDeltalogWriter(collectionID, partitionID, segmentID UniqueID, pkType schemapb.DataType, batchSize int) (*SerializeWriter[*DeleteLog], func() (*Blob, error), error) { +func CreateDeltalogWriter(collectionID, partitionID, segmentID UniqueID, pkType schemapb.DataType, batchSize int, +) (*SerializeWriterImpl[*DeleteLog], func() (*Blob, error), error) { format := paramtable.Get().DataNodeCfg.DeltalogFormat.GetValue() if format == "json" { eventWriter := newDeltalogStreamWriter(collectionID, partitionID, segmentID) diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index 177f309482..3257eb823c 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" ) @@ -48,7 +49,7 @@ func TestBinlogDeserializeReader(t *testing.T) { }) assert.NoError(t, err) defer reader.Close() - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, io.EOF, err) }) @@ -61,14 +62,13 @@ func TestBinlogDeserializeReader(t *testing.T) { defer reader.Close() for i := 1; i <= size; i++ { - err = reader.Next() + value, err := reader.NextValue() assert.NoError(t, err) - value := reader.Value() - assertTestData(t, i, value) + assertTestData(t, i, *value) } - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, io.EOF, err) }) @@ -81,14 +81,13 @@ func TestBinlogDeserializeReader(t *testing.T) { defer reader.Close() for i := 1; i <= size; i++ { - err = reader.Next() + value, err := reader.NextValue() assert.NoError(t, err) - value := reader.Value() - assertTestAddedFieldData(t, i, value) + assertTestAddedFieldData(t, i, *value) } - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, io.EOF, err) }) } @@ -142,13 +141,55 @@ func TestBinlogStreamWriter(t *testing.T) { } func TestBinlogSerializeWriter(t *testing.T) { + t.Run("test write value", func(t *testing.T) { + size := 100 + blobs, err := generateTestData(size) + assert.NoError(t, err) + reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) + assert.NoError(t, err) + defer reader.Close() + + schema := generateTestSchema() + alloc := allocator.NewLocalAllocator(1, 92) // 90 for 18 fields * 5 chunks, 1 for 1 stats file + chunkSize := uint64(64) // 64B + rw, err := newCompositeBinlogRecordWriter(0, 0, 0, schema, + func(b []*Blob) error { + log.Debug("write blobs", zap.Int("files", len(b))) + return nil + }, + alloc, chunkSize, "root", 10000) + assert.NoError(t, err) + writer := NewBinlogValueWriter(rw, 20) + assert.NoError(t, err) + + for i := 1; i <= size; i++ { + value, err := reader.NextValue() + assert.NoError(t, err) + + assertTestData(t, i, *value) + err = writer.WriteValue(*value) + assert.NoError(t, err) + } + + _, err = reader.NextValue() + assert.Equal(t, io.EOF, err) + err = writer.Close() + assert.NoError(t, err) + + logs, _, _ := writer.GetLogs() + assert.Equal(t, 18, len(logs)) + assert.Equal(t, 5, len(logs[0].Binlogs)) + }) +} + +func TestBinlogValueWriter(t *testing.T) { t.Run("test empty data", func(t *testing.T) { reader, err := NewBinlogDeserializeReader(nil, func() ([]*Blob, error) { return nil, io.EOF }) assert.NoError(t, err) defer reader.Close() - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, io.EOF, err) }) @@ -167,12 +208,11 @@ func TestBinlogSerializeWriter(t *testing.T) { assert.NoError(t, err) for i := 1; i <= size; i++ { - err = reader.Next() + value, err := reader.NextValue() assert.NoError(t, err) - value := reader.Value() - assertTestData(t, i, value) - err := writer.Write(value) + assertTestData(t, i, *value) + err = writer.WriteValue(*value) assert.NoError(t, err) } @@ -181,11 +221,11 @@ func TestBinlogSerializeWriter(t *testing.T) { assert.Equal(t, !f.IsPrimaryKey, props.DictionaryEnabled()) } - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, io.EOF, err) err = writer.Close() assert.NoError(t, err) - assert.True(t, writer.WrittenMemorySize() >= 429) + assert.True(t, writer.GetWrittenUncompressed() >= 429) // Read from the written data newblobs := make([]*Blob, len(writers)) @@ -208,11 +248,10 @@ func TestBinlogSerializeWriter(t *testing.T) { assert.NoError(t, err) defer reader.Close() for i := 1; i <= size; i++ { - err = reader.Next() + value, err := reader.NextValue() assert.NoError(t, err, i) - value := reader.Value() - assertTestData(t, i, value) + assertTestData(t, i, *value) } }) } @@ -245,13 +284,13 @@ func TestSize(t *testing.T) { }, }, } - err := writer.Write(value) + err := writer.WriteValue(value) assert.NoError(t, err) } err = writer.Close() assert.NoError(t, err) - memSize := writer.WrittenMemorySize() + memSize := writer.GetWrittenUncompressed() assert.Greater(t, memSize, uint64(8*4*size)) // written memory size should greater than data size t.Log("writtern memory size", memSize) }) @@ -283,13 +322,13 @@ func TestSize(t *testing.T) { }, }, } - err := writer.Write(value) + err := writer.WriteValue(value) assert.NoError(t, err) } err = writer.Close() assert.NoError(t, err) - memSize := writer.WrittenMemorySize() + memSize := writer.GetWrittenUncompressed() assert.Greater(t, memSize, uint64(8*4*size)) // written memory size should greater than data size t.Log("writtern memory size", memSize) }) @@ -358,7 +397,7 @@ func BenchmarkSerializeWriter(b *testing.B) { writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, s) assert.NoError(b, err) for _, v := range values { - _ = writer.Write(v) + _ = writer.WriteValue(v) assert.NoError(b, err) } writer.Close() @@ -391,7 +430,7 @@ func TestNull(t *testing.T) { IsDeleted: false, Value: m, } - writer.Write(value) + writer.WriteValue(value) err = writer.Close() assert.NoError(t, err) @@ -408,11 +447,10 @@ func TestNull(t *testing.T) { reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) defer reader.Close() - err = reader.Next() + v, err := reader.NextValue() assert.NoError(t, err) - readValue := reader.Value() - assert.Equal(t, value, readValue) + assert.Equal(t, value, *v) }) } @@ -441,7 +479,7 @@ func TestDeltalogDeserializeReader(t *testing.T) { reader, err := newDeltalogDeserializeReader(nil) assert.NoError(t, err) defer reader.Close() - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, io.EOF, err) }) @@ -454,14 +492,13 @@ func TestDeltalogDeserializeReader(t *testing.T) { defer reader.Close() for i := 0; i < size; i++ { - err = reader.Next() + value, err := reader.NextValue() assert.NoError(t, err) - value := reader.Value() - assertTestDeltalogData(t, i, value) + assertTestDeltalogData(t, i, *value) } - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, io.EOF, err) }) } @@ -471,7 +508,7 @@ func TestDeltalogSerializeWriter(t *testing.T) { reader, err := newDeltalogDeserializeReader(nil) assert.NoError(t, err) defer reader.Close() - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, io.EOF, err) }) @@ -489,16 +526,15 @@ func TestDeltalogSerializeWriter(t *testing.T) { assert.NoError(t, err) for i := 0; i < size; i++ { - err = reader.Next() + value, err := reader.NextValue() assert.NoError(t, err) - value := reader.Value() - assertTestDeltalogData(t, i, value) - err := writer.Write(value) + assertTestDeltalogData(t, i, *value) + err = writer.WriteValue(*value) assert.NoError(t, err) } - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, io.EOF, err) err = writer.Close() assert.NoError(t, err) @@ -512,11 +548,10 @@ func TestDeltalogSerializeWriter(t *testing.T) { assert.NoError(t, err) defer reader.Close() for i := 0; i < size; i++ { - err = reader.Next() + value, err := reader.NextValue() assert.NoError(t, err, i) - value := reader.Value() - assertTestDeltalogData(t, i, value) + assertTestDeltalogData(t, i, *value) } }) } @@ -569,13 +604,12 @@ func TestDeltalogPkTsSeparateFormat(t *testing.T) { assert.NoError(t, err) defer reader.Close() for i := 0; i < size; i++ { - err = reader.Next() + value, err := reader.NextValue() assert.NoError(t, err) - value := reader.Value() - tc.assertPk(t, i, value) + tc.assertPk(t, i, *value) } - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, io.EOF, err) }) } @@ -614,7 +648,7 @@ func BenchmarkDeltalogFormatWriter(b *testing.B) { var value *DeleteLog for j := 0; j < size; j++ { value = NewDeleteLog(NewInt64PrimaryKey(int64(j)), uint64(j+1)) - writer.Write(value) + writer.WriteValue(value) } writer.Close() eventWriter.Finalize() @@ -646,7 +680,7 @@ func writeDeltalogNewFormat(size int, pkType schemapb.DataType, batchSize int) ( case schemapb.DataType_VarChar: value = NewDeleteLog(NewVarCharPrimaryKey(strconv.Itoa(i)), uint64(i+1)) } - if err = writer.Write(value); err != nil { + if err = writer.WriteValue(value); err != nil { return nil, err } } @@ -667,8 +701,7 @@ func readDeltaLog(size int, blob *Blob) error { } defer reader.Close() for j := 0; j < size; j++ { - err = reader.Next() - _ = reader.Value() + _, err = reader.NextValue() if err != nil { return err } diff --git a/internal/storage/serde_events_v2.go b/internal/storage/serde_events_v2.go index a76ad0184d..49bd6b8096 100644 --- a/internal/storage/serde_events_v2.go +++ b/internal/storage/serde_events_v2.go @@ -117,7 +117,7 @@ func newPackedRecordReader(paths [][]string, schema *schemapb.CollectionSchema, func NewPackedDeserializeReader(paths [][]string, schema *schemapb.CollectionSchema, bufferSize int64, -) (*DeserializeReader[*Value], error) { +) (*DeserializeReaderImpl[*Value], error) { reader, err := newPackedRecordReader(paths, schema, bufferSize) if err != nil { return nil, err @@ -260,7 +260,9 @@ func NewPackedRecordWriter(paths []string, schema *arrow.Schema, bufferSize int6 }, nil } -func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int) (*SerializeWriter[*Value], error) { +func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, + multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int, +) (*SerializeWriterImpl[*Value], error) { arrowSchema, err := ConvertToArrowSchema(schema.Fields) if err != nil { return nil, merr.WrapErrServiceInternal( @@ -523,6 +525,18 @@ func (pw *PackedBinlogRecordWriter) GetRowNum() int64 { return pw.rowNum } +func (pw *PackedBinlogRecordWriter) FlushChunk() error { + return nil // do nothing +} + +func (pw *PackedBinlogRecordWriter) Schema() *schemapb.CollectionSchema { + return pw.schema +} + +func (pw *PackedBinlogRecordWriter) GetBufferUncompressed() uint64 { + return uint64(pw.multiPartUploadSize) +} + func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID, schema *schemapb.CollectionSchema, blobsWriter ChunkedBlobsWriter, allocator allocator.Interface, chunkSize uint64, rootPath string, maxRowNum int64, bufferSize, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, ) (*PackedBinlogRecordWriter, error) { diff --git a/internal/storage/serde_events_v2_test.go b/internal/storage/serde_events_v2_test.go index c41088648e..41e1e6245b 100644 --- a/internal/storage/serde_events_v2_test.go +++ b/internal/storage/serde_events_v2_test.go @@ -53,12 +53,11 @@ func TestPackedSerde(t *testing.T) { assert.NoError(t, err) for i := 1; i <= size; i++ { - err = reader.Next() + value, err := reader.NextValue() assert.NoError(t, err) - value := reader.Value() - assertTestData(t, i, value) - err := writer.Write(value) + assertTestData(t, i, *value) + err = writer.WriteValue(*value) assert.NoError(t, err) } err = writer.Close() @@ -76,12 +75,11 @@ func TestPackedSerde(t *testing.T) { defer reader.Close() for i := 0; i < size*len(paths); i++ { - err = reader.Next() + value, err := reader.NextValue() assert.NoError(t, err) - value := reader.Value() - assertTestData(t, i%10+1, value) + assertTestData(t, i%10+1, *value) } - err = reader.Next() + _, err = reader.NextValue() assert.Equal(t, err, io.EOF) }) } diff --git a/internal/storage/serde_test.go b/internal/storage/serde_test.go index 399cb6745f..2a4822ca0c 100644 --- a/internal/storage/serde_test.go +++ b/internal/storage/serde_test.go @@ -131,11 +131,10 @@ func BenchmarkDeserializeReader(b *testing.B) { assert.NoError(b, err) defer reader.Close() for i := 0; i < len; i++ { - err = reader.Next() - _ = reader.Value() + _, err = reader.NextValue() assert.NoError(b, err) } - err = reader.Next() + _, err = reader.NextValue() assert.Equal(b, io.EOF, err) } } diff --git a/internal/storage/sort.go b/internal/storage/sort.go index de1258c2a9..4365fc04e8 100644 --- a/internal/storage/sort.go +++ b/internal/storage/sort.go @@ -234,11 +234,11 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader, var pq *PriorityQueue[index] switch recs[0].Column(pkFieldId).(type) { case *array.Int64: - pq = NewPriorityQueue[index](func(x, y *index) bool { + pq = NewPriorityQueue(func(x, y *index) bool { return recs[x.ri].Column(pkFieldId).(*array.Int64).Value(x.i) < recs[y.ri].Column(pkFieldId).(*array.Int64).Value(y.i) }) case *array.String: - pq = NewPriorityQueue[index](func(x, y *index) bool { + pq = NewPriorityQueue(func(x, y *index) bool { return recs[x.ri].Column(pkFieldId).(*array.String).Value(x.i) < recs[y.ri].Column(pkFieldId).(*array.String).Value(y.i) }) } diff --git a/internal/storage/stats.go b/internal/storage/stats.go index 0488bf54d8..e24969168a 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -204,7 +204,7 @@ func (stats *PrimaryKeyStats) UpdateMinMax(pk PrimaryKey) { func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error) { if rowNum <= 0 { - return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num", rowNum) + return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num %d", rowNum) } bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() diff --git a/internal/storagev2/packed/packed_reader.go b/internal/storagev2/packed/packed_reader.go index a3fa48cdbd..98fdab7c60 100644 --- a/internal/storagev2/packed/packed_reader.go +++ b/internal/storagev2/packed/packed_reader.go @@ -86,9 +86,13 @@ func (pr *PackedReader) ReadNext() (arrow.Record, error) { } func (pr *PackedReader) Close() error { + if pr.cPackedReader == nil { + return nil + } status := C.CloseReader(pr.cPackedReader) if err := ConsumeCStatusIntoError(&status); err != nil { return err } + pr.cPackedReader = nil return nil } diff --git a/internal/util/importutilv2/binlog/l0_reader.go b/internal/util/importutilv2/binlog/l0_reader.go index a873a034f0..2a87717cbc 100644 --- a/internal/util/importutilv2/binlog/l0_reader.go +++ b/internal/util/importutilv2/binlog/l0_reader.go @@ -100,7 +100,7 @@ func (r *l0Reader) Read() (*storage.DeleteData, error) { defer reader.Close() for { - err := reader.Next() + dl, err := reader.NextValue() if err != nil { if err == io.EOF { break @@ -109,8 +109,7 @@ func (r *l0Reader) Read() (*storage.DeleteData, error) { return nil, err } - dl := reader.Value() - deleteData.Append(dl.Pk, dl.Ts) + deleteData.Append((*dl).Pk, (*dl).Ts) } r.readIdx++ diff --git a/tests/integration/compaction/clustering_compaction_null_data_test.go b/tests/integration/compaction/clustering_compaction_null_data_test.go index ea5082ffa4..c6a4c3d185 100644 --- a/tests/integration/compaction/clustering_compaction_null_data_test.go +++ b/tests/integration/compaction/clustering_compaction_null_data_test.go @@ -247,7 +247,6 @@ func (s *ClusteringCompactionNullDataSuite) TestClusteringCompactionNullData() { s.NoError(err) s.Equal(segsInfoResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) for _, segInfo := range segsInfoResp.GetInfos() { - s.LessOrEqual(segInfo.GetNumOfRows(), int64(1024*1024/128)) totalRows += segInfo.GetNumOfRows() }