enhance: API integration with storage v2 in clustering-compactions (#40133)

See #39173

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2025-03-13 14:12:06 +08:00 committed by GitHub
parent 972e47043a
commit df4285c9ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 603 additions and 1047 deletions

View File

@ -57,7 +57,7 @@ func ComposeDeleteFromDeltalogs(ctx context.Context, io io.BinlogIO, paths []str
defer reader.Close()
for {
err := reader.Next()
dl, err := reader.NextValue()
if err != nil {
if err == sio.EOF {
break
@ -66,11 +66,10 @@ func ComposeDeleteFromDeltalogs(ctx context.Context, io io.BinlogIO, paths []str
return nil, err
}
dl := reader.Value()
if ts, ok := pk2Ts[dl.Pk.GetValue()]; ok && ts > dl.Ts {
if ts, ok := pk2Ts[(*dl).Pk.GetValue()]; ok && ts > (*dl).Ts {
continue
}
pk2Ts[dl.Pk.GetValue()] = dl.Ts
pk2Ts[(*dl).Pk.GetValue()] = (*dl).Ts
}
log.Info("compose delete end", zap.Int("delete entries counts", len(pk2Ts)))

View File

@ -106,7 +106,6 @@ func (t *clusteringCompactionTask) Process() bool {
if currentState != lastState {
ts := time.Now().Unix()
lastStateDuration := ts - t.GetTaskProto().GetLastStateStartTime()
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
Observe(float64(lastStateDuration * 1000))
@ -208,6 +207,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: taskProto.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
MaxSize: taskProto.GetMaxSize(),
}
log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

View File

@ -504,6 +504,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
Begin: start,
End: end,
},
MaxSize: expectedSegmentSize,
}
err = m.compactionHandler.enqueueCompaction(task)
if err != nil {

View File

@ -24,12 +24,12 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/session"

File diff suppressed because it is too large Load Diff

View File

@ -18,7 +18,6 @@ package compactor
import (
"context"
"fmt"
"sync"
"testing"
"time"
@ -212,9 +211,10 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 2048
s.task.plan.MaxSegmentRows = 2048
s.task.plan.MaxSize = 1024 * 1024 * 1024 // max segment size = 1GB, we won't touch this value
s.task.plan.PreAllocatedSegmentIDs = &datapb.IDRange{
Begin: time.Now().UnixMilli(),
End: time.Now().UnixMilli() + 1000,
Begin: 1,
End: 101,
}
// 8+8+8+4+7+4*4=51
@ -295,9 +295,10 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 3000
s.task.plan.MaxSegmentRows = 3000
s.task.plan.MaxSize = 1024 * 1024 * 1024 // max segment size = 1GB, we won't touch this value
s.task.plan.PreAllocatedSegmentIDs = &datapb.IDRange{
Begin: time.Now().UnixMilli(),
End: time.Now().UnixMilli() + 1000,
Begin: 1,
End: 1000,
}
// 8+8+8+4+7+4*4=51
@ -311,7 +312,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
compactionResult, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(2, len(s.task.clusterBuffers))
s.Equal(4, len(compactionResult.GetSegments()))
s.Equal(2, len(compactionResult.GetSegments()))
totalBinlogNum := 0
totalRowNum := int64(0)
for _, fb := range compactionResult.GetSegments()[0].GetInsertLogs() {
@ -330,7 +331,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
statsRowNum += b.GetEntriesNum()
}
}
s.Equal(3, totalBinlogNum/len(schema.GetFields()))
s.Equal(5, totalBinlogNum/len(schema.GetFields()))
s.Equal(1, statsBinlogNum)
s.Equal(totalRowNum, statsRowNum)
}
@ -368,9 +369,10 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 2048
s.task.plan.MaxSegmentRows = 2048
s.task.plan.MaxSize = 1024 * 1024 * 1024 // 1GB
s.task.plan.PreAllocatedSegmentIDs = &datapb.IDRange{
Begin: time.Now().UnixMilli(),
End: time.Now().UnixMilli() + 1000,
Begin: 1,
End: 1000,
}
// 8 + 8 + 8 + 7 + 8 = 39
@ -419,173 +421,6 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
s.Equal(totalRowNum, bm25RowNum)
}
func (s *ClusteringCompactionTaskSuite) TestCheckBuffersAfterCompaction() {
s.Run("no leak", func() {
task := &clusteringCompactionTask{clusterBuffers: []*ClusterBuffer{{}}}
s.NoError(task.checkBuffersAfterCompaction())
})
s.Run("leak binlog", func() {
task := &clusteringCompactionTask{
clusterBuffers: []*ClusterBuffer{
{
flushedBinlogs: map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog{
1: {
101: {
FieldID: 101,
Binlogs: []*datapb.Binlog{{LogID: 1000}},
},
},
},
},
},
}
s.Error(task.checkBuffersAfterCompaction())
})
}
func (s *ClusteringCompactionTaskSuite) TestGenerateBM25Stats() {
s.Run("normal case", func() {
segmentID := int64(1)
task := &clusteringCompactionTask{
collectionID: 111,
partitionID: 222,
bm25FieldIds: []int64{102},
logIDAlloc: s.mockAlloc,
binlogIO: s.mockBinlogIO,
}
statsMap := make(map[int64]*storage.BM25Stats)
statsMap[102] = storage.NewBM25Stats()
statsMap[102].Append(map[uint32]float32{1: 1})
binlogs, err := task.generateBM25Stats(context.Background(), segmentID, statsMap)
s.NoError(err)
s.Equal(1, len(binlogs))
s.Equal(1, len(binlogs[0].Binlogs))
s.Equal(int64(102), binlogs[0].FieldID)
s.Equal(int64(1), binlogs[0].Binlogs[0].GetEntriesNum())
})
s.Run("alloc ID failed", func() {
segmentID := int64(1)
mockAlloc := allocator.NewMockAllocator(s.T())
mockAlloc.EXPECT().Alloc(mock.Anything).Return(0, 0, fmt.Errorf("mock error")).Once()
task := &clusteringCompactionTask{
collectionID: 111,
partitionID: 222,
bm25FieldIds: []int64{102},
logIDAlloc: mockAlloc,
}
statsMap := make(map[int64]*storage.BM25Stats)
statsMap[102] = storage.NewBM25Stats()
statsMap[102].Append(map[uint32]float32{1: 1})
_, err := task.generateBM25Stats(context.Background(), segmentID, statsMap)
s.Error(err)
})
s.Run("upload failed", func() {
segmentID := int64(1)
mockBinlogIO := mock_util.NewMockBinlogIO(s.T())
mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")).Once()
task := &clusteringCompactionTask{
collectionID: 111,
partitionID: 222,
bm25FieldIds: []int64{102},
logIDAlloc: s.mockAlloc,
binlogIO: mockBinlogIO,
}
statsMap := make(map[int64]*storage.BM25Stats)
statsMap[102] = storage.NewBM25Stats()
statsMap[102].Append(map[uint32]float32{1: 1})
_, err := task.generateBM25Stats(context.Background(), segmentID, statsMap)
s.Error(err)
})
}
func (s *ClusteringCompactionTaskSuite) TestGeneratePkStats() {
pkField := &schemapb.FieldSchema{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
}
s.Run("num rows zero", func() {
task := &clusteringCompactionTask{
primaryKeyField: pkField,
}
binlogs, err := task.generatePkStats(context.Background(), 1, 0, nil)
s.Error(err)
s.Nil(binlogs)
})
s.Run("download binlogs failed", func() {
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
task := &clusteringCompactionTask{
binlogIO: s.mockBinlogIO,
primaryKeyField: pkField,
}
binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}})
s.Error(err)
s.Nil(binlogs)
})
s.Run("NewInsertBinlogIterator failed", func() {
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{[]byte("mock")}, nil)
task := &clusteringCompactionTask{
binlogIO: s.mockBinlogIO,
primaryKeyField: pkField,
}
binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}})
s.Error(err)
s.Nil(binlogs)
})
s.Run("upload failed", func() {
schema := genCollectionSchema()
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, SegmentID, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)
for i := 0; i < 2000; i++ {
v := storage.Value{
PK: storage.NewInt64PrimaryKey(int64(i)),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
Value: genRow(int64(i)),
}
err = segWriter.Write(&v)
s.Require().NoError(err)
}
segWriter.FlushAndIsFull()
kvs, _, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.NoError(err)
mockBinlogIO := mock_util.NewMockBinlogIO(s.T())
mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil)
mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error"))
task := &clusteringCompactionTask{
collectionID: CollectionID,
partitionID: PartitionID,
plan: &datapb.CompactionPlan{
Schema: genCollectionSchema(),
},
binlogIO: mockBinlogIO,
primaryKeyField: pkField,
logIDAlloc: s.mockAlloc,
}
binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}})
s.Error(err)
s.Nil(binlogs)
})
}
func genRow(magic int64) map[int64]interface{} {
ts := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)
return map[int64]interface{}{

View File

@ -18,7 +18,9 @@ package compactor
import (
"context"
sio "io"
"strconv"
"time"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
@ -29,11 +31,142 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
const compactionBatchSize = 100
type EntityFilter struct {
deletedPkTs map[interface{}]typeutil.Timestamp // pk2ts
ttl int64 // nanoseconds
currentTime time.Time
expiredCount int
deletedCount int
}
func newEntityFilter(deletedPkTs map[interface{}]typeutil.Timestamp, ttl int64, currTime time.Time) *EntityFilter {
if deletedPkTs == nil {
deletedPkTs = make(map[interface{}]typeutil.Timestamp)
}
return &EntityFilter{
deletedPkTs: deletedPkTs,
ttl: ttl,
currentTime: currTime,
}
}
func (filter *EntityFilter) Filtered(pk any, ts typeutil.Timestamp) bool {
if filter.isEntityDeleted(pk, ts) {
filter.deletedCount++
return true
}
// Filtering expired entity
if filter.isEntityExpired(ts) {
filter.expiredCount++
return true
}
return false
}
func (filter *EntityFilter) GetExpiredCount() int {
return filter.expiredCount
}
func (filter *EntityFilter) GetDeletedCount() int {
return filter.deletedCount
}
func (filter *EntityFilter) GetDeltalogDeleteCount() int {
return len(filter.deletedPkTs)
}
func (filter *EntityFilter) GetMissingDeleteCount() int {
diff := filter.GetDeltalogDeleteCount() - filter.GetDeletedCount()
if diff <= 0 {
diff = 0
}
return diff
}
func (filter *EntityFilter) isEntityDeleted(pk interface{}, pkTs typeutil.Timestamp) bool {
if deleteTs, ok := filter.deletedPkTs[pk]; ok {
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if pkTs < deleteTs {
return true
}
}
return false
}
func (filter *EntityFilter) isEntityExpired(entityTs typeutil.Timestamp) bool {
// entity expire is not enabled if duration <= 0
if filter.ttl <= 0 {
return false
}
entityTime, _ := tsoutil.ParseTS(entityTs)
// this dur can represents 292 million years before or after 1970, enough for milvus
// ttl calculation
dur := filter.currentTime.UnixMilli() - entityTime.UnixMilli()
// filter.ttl is nanoseconds
return filter.ttl/int64(time.Millisecond) <= dur
}
func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]typeutil.Timestamp, error) {
pk2Ts := make(map[interface{}]typeutil.Timestamp)
log := log.Ctx(ctx)
if len(paths) == 0 {
log.Debug("compact with no deltalogs, skip merge deltalogs")
return pk2Ts, nil
}
blobs := make([]*storage.Blob, 0)
binaries, err := io.Download(ctx, paths)
if err != nil {
log.Warn("compact wrong, fail to download deltalogs",
zap.Strings("path", paths),
zap.Error(err))
return nil, err
}
for i := range binaries {
blobs = append(blobs, &storage.Blob{Value: binaries[i]})
}
reader, err := storage.CreateDeltalogReader(blobs)
if err != nil {
log.Error("malformed delta file", zap.Error(err))
return nil, err
}
defer reader.Close()
for {
dl, err := reader.NextValue()
if err != nil {
if err == sio.EOF {
break
}
log.Error("compact wrong, fail to read deltalogs", zap.Error(err))
return nil, err
}
if ts, ok := pk2Ts[(*dl).Pk.GetValue()]; ok && ts > (*dl).Ts {
continue
}
pk2Ts[(*dl).Pk.GetValue()] = (*dl).Ts
}
log.Info("compact mergeDeltalogs end", zap.Int("delete entries counts", len(pk2Ts)))
return pk2Ts, nil
}
func serializeWrite(ctx context.Context, allocator allocator.Interface, writer *SegmentWriter) (kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "serializeWrite")
defer span.End()

View File

@ -403,7 +403,7 @@ func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs []str
dData := &storage.DeleteData{}
for {
err := reader.Next()
dl, err := reader.NextValue()
if err != nil {
if err == sio.EOF {
break
@ -412,8 +412,7 @@ func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs []str
return nil, err
}
dl := reader.Value()
dData.Append(dl.Pk, dl.Ts)
dData.Append((*dl).Pk, (*dl).Ts)
}
return dData, nil

View File

@ -31,7 +31,6 @@ func mergeSortMultipleSegments(ctx context.Context,
tr *timerecord.TimeRecorder,
currentTime time.Time,
collectionTtl int64,
bm25FieldIds []int64,
) ([]*datapb.CompactionSegment, error) {
_ = tr.RecordSpan()
@ -43,7 +42,7 @@ func mergeSortMultipleSegments(ctx context.Context,
segIDAlloc := allocator.NewLocalAllocator(plan.GetPreAllocatedSegmentIDs().GetBegin(), plan.GetPreAllocatedSegmentIDs().GetEnd())
logIDAlloc := allocator.NewLocalAllocator(plan.GetBeginLogID(), math.MaxInt64)
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
writer := NewMultiSegmentWriter(binlogIO, compAlloc, plan, maxRows, partitionID, collectionID, bm25FieldIds)
writer := NewMultiSegmentWriter(ctx, binlogIO, compAlloc, plan.GetMaxSize(), plan.GetSchema(), maxRows, partitionID, collectionID, plan.GetChannel(), 4096)
pkField, err := typeutil.GetPrimaryFieldSchema(plan.GetSchema())
if err != nil {

View File

@ -143,7 +143,7 @@ func (t *mixCompactionTask) mergeSplit(
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd())
logIDAlloc := allocator.NewLocalAllocator(t.plan.GetBeginLogID(), math.MaxInt64)
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
mWriter := NewMultiSegmentWriter(t.binlogIO, compAlloc, t.plan, t.maxRows, t.partitionID, t.collectionID, t.bm25FieldIDs)
mWriter := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096)
deletedRowCount := int64(0)
expiredRowCount := int64(0)
@ -167,6 +167,18 @@ func (t *mixCompactionTask) mergeSplit(
return nil, err
}
res := mWriter.GetCompactionSegments()
if len(res) == 0 {
// append an empty segment
id, err := segIDAlloc.AllocOne()
if err != nil {
return nil, err
}
res = append(res, &datapb.CompactionSegment{
SegmentID: id,
NumOfRows: 0,
Channel: t.GetChannelName(),
})
}
totalElapse := t.tr.RecordSpan()
log.Info("compact mergeSplit end",
@ -335,7 +347,7 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
if sortMergeAppicable {
log.Info("compact by merge sort")
res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO,
t.plan.GetSegmentBinlogs(), t.tr, t.currentTime, t.plan.GetCollectionTtl(), t.bm25FieldIDs)
t.plan.GetSegmentBinlogs(), t.tr, t.currentTime, t.plan.GetCollectionTtl())
if err != nil {
log.Warn("compact wrong, fail to merge sort segments", zap.Error(err))
return nil, err

View File

@ -45,10 +45,11 @@ import (
// Not concurrent safe.
type MultiSegmentWriter struct {
ctx context.Context
binlogIO io.BinlogIO
allocator *compactionAlloactor
writer storage.BinlogRecordWriter
writer *storage.BinlogValueWriter
currentSegmentID typeutil.UniqueID
maxRows int64
@ -61,14 +62,12 @@ type MultiSegmentWriter struct {
partitionID int64
collectionID int64
channel string
batchSize int
res []*datapb.CompactionSegment
// DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord
bm25Fields []int64
}
var _ storage.RecordWriter = &MultiSegmentWriter{}
type compactionAlloactor struct {
segmentAlloc allocator.Interface
logIDAlloc allocator.Interface
@ -85,27 +84,22 @@ func (alloc *compactionAlloactor) allocSegmentID() (typeutil.UniqueID, error) {
return alloc.segmentAlloc.AllocOne()
}
func (alloc *compactionAlloactor) getLogIDAllocator() allocator.Interface {
return alloc.logIDAlloc
}
func NewMultiSegmentWriter(binlogIO io.BinlogIO, allocator *compactionAlloactor, plan *datapb.CompactionPlan,
maxRows int64, partitionID, collectionID int64, bm25Fields []int64,
func NewMultiSegmentWriter(ctx context.Context, binlogIO io.BinlogIO, allocator *compactionAlloactor, segmentSize int64,
schema *schemapb.CollectionSchema,
maxRows int64, partitionID, collectionID int64, channel string, batchSize int,
) *MultiSegmentWriter {
return &MultiSegmentWriter{
binlogIO: binlogIO,
allocator: allocator,
maxRows: maxRows, // For bloomfilter only
segmentSize: plan.GetMaxSize(),
schema: plan.GetSchema(),
ctx: ctx,
binlogIO: binlogIO,
allocator: allocator,
maxRows: maxRows, // For bloomfilter only
segmentSize: segmentSize,
schema: schema,
partitionID: partitionID,
collectionID: collectionID,
channel: plan.GetChannel(),
res: make([]*datapb.CompactionSegment, 0),
bm25Fields: bm25Fields,
channel: channel,
batchSize: batchSize,
res: make([]*datapb.CompactionSegment, 0),
}
}
@ -128,10 +122,12 @@ func (w *MultiSegmentWriter) closeWriter() error {
w.res = append(w.res, result)
log.Info("Segment writer flushed a segment",
log.Info("created new segment",
zap.Int64("segmentID", w.currentSegmentID),
zap.String("channel", w.channel),
zap.Int64("totalRows", w.writer.GetRowNum()))
zap.Int64("totalRows", w.writer.GetRowNum()),
zap.Uint64("totalSize", w.writer.GetWrittenUncompressed()),
zap.Int64("expected segment size", w.segmentSize))
}
return nil
}
@ -147,11 +143,10 @@ func (w *MultiSegmentWriter) rotateWriter() error {
}
w.currentSegmentID = newSegmentID
ctx := context.TODO()
chunkSize := paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64()
rootPath := binlog.GetRootPath()
writer, err := storage.NewBinlogRecordWriter(ctx, w.collectionID, w.partitionID, newSegmentID,
rw, err := storage.NewBinlogRecordWriter(w.ctx, w.collectionID, w.partitionID, newSegmentID,
w.schema, w.allocator.logIDAlloc, chunkSize, rootPath, w.maxRows,
storage.WithUploader(func(ctx context.Context, kvs map[string][]byte) error {
return w.binlogIO.Upload(ctx, kvs)
@ -159,7 +154,8 @@ func (w *MultiSegmentWriter) rotateWriter() error {
if err != nil {
return err
}
w.writer = writer
w.writer = storage.NewBinlogValueWriter(rw, w.batchSize)
return nil
}
@ -170,6 +166,13 @@ func (w *MultiSegmentWriter) GetWrittenUncompressed() uint64 {
return w.writer.GetWrittenUncompressed()
}
func (w *MultiSegmentWriter) GetBufferUncompressed() uint64 {
if w.writer == nil {
return 0
}
return w.writer.GetBufferUncompressed()
}
func (w *MultiSegmentWriter) GetCompactionSegments() []*datapb.CompactionSegment {
return w.res
}
@ -184,23 +187,31 @@ func (w *MultiSegmentWriter) Write(r storage.Record) error {
return w.writer.Write(r)
}
// DONOT return an empty list if every insert of the segment is deleted,
// append an empty segment instead
func (w *MultiSegmentWriter) Close() error {
if w.writer == nil && len(w.res) == 0 {
// append an empty segment
id, err := w.allocator.segmentAlloc.AllocOne()
if err != nil {
func (w *MultiSegmentWriter) WriteValue(v *storage.Value) error {
if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) {
if err := w.rotateWriter(); err != nil {
return err
}
w.res = append(w.res, &datapb.CompactionSegment{
SegmentID: id,
NumOfRows: 0,
Channel: w.channel,
})
}
return w.writer.WriteValue(v)
}
func (w *MultiSegmentWriter) FlushChunk() error {
if w.writer == nil {
return nil
}
return w.closeWriter()
if err := w.writer.Flush(); err != nil {
return err
}
return w.writer.FlushChunk()
}
func (w *MultiSegmentWriter) Close() error {
if w.writer != nil {
return w.closeWriter()
}
return nil
}
func NewSegmentDeltaWriter(segmentID, partitionID, collectionID int64) *SegmentDeltaWriter {
@ -276,7 +287,7 @@ func (w *SegmentDeltaWriter) Finish() (*storage.Blob, *writebuffer.TimeRange, er
}
type SegmentWriter struct {
writer *storage.SerializeWriter[*storage.Value]
writer *storage.BinlogSerializeWriter
closers []func() (*storage.Blob, error)
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
@ -316,7 +327,7 @@ func (w *SegmentWriter) GetPkID() int64 {
}
func (w *SegmentWriter) WrittenMemorySize() uint64 {
return w.writer.WrittenMemorySize()
return w.writer.GetWrittenUncompressed()
}
func (w *SegmentWriter) WriteRecord(r storage.Record) error {
@ -395,7 +406,7 @@ func (w *SegmentWriter) WriteRecord(r storage.Record) error {
rec := storage.NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(rows)), field2Col)
defer rec.Release()
return w.writer.WriteRecord(rec)
return w.writer.Write(rec)
}
func (w *SegmentWriter) Write(v *storage.Value) error {
@ -422,7 +433,7 @@ func (w *SegmentWriter) Write(v *storage.Value) error {
}
w.rowCount.Inc()
return w.writer.Write(v)
return w.writer.WriteValue(v)
}
func (w *SegmentWriter) Finish() (*storage.Blob, error) {
@ -454,25 +465,25 @@ func (w *SegmentWriter) GetBm25StatsBlob() (map[int64]*storage.Blob, error) {
}
func (w *SegmentWriter) IsFull() bool {
return w.writer.WrittenMemorySize() > w.maxBinlogSize
return w.writer.GetWrittenUncompressed() > w.maxBinlogSize
}
func (w *SegmentWriter) FlushAndIsFull() bool {
w.writer.Flush()
return w.writer.WrittenMemorySize() > w.maxBinlogSize
return w.writer.GetWrittenUncompressed() > w.maxBinlogSize
}
func (w *SegmentWriter) IsFullWithBinlogMaxSize(binLogMaxSize uint64) bool {
return w.writer.WrittenMemorySize() > binLogMaxSize
return w.writer.GetWrittenUncompressed() > binLogMaxSize
}
func (w *SegmentWriter) IsEmpty() bool {
return w.writer.WrittenMemorySize() == 0
return w.writer.GetWrittenUncompressed() == 0
}
func (w *SegmentWriter) FlushAndIsEmpty() bool {
w.writer.Flush()
return w.writer.WrittenMemorySize() == 0
return w.writer.GetWrittenUncompressed() == 0
}
func (w *SegmentWriter) GetTimeRange() *writebuffer.TimeRange {
@ -499,11 +510,11 @@ func (w *SegmentWriter) SerializeYield() ([]*storage.Blob, *writebuffer.TimeRang
}
func (w *SegmentWriter) GetTotalSize() int64 {
return w.syncedSize.Load() + int64(w.writer.WrittenMemorySize())
return w.syncedSize.Load() + int64(w.writer.GetWrittenUncompressed())
}
func (w *SegmentWriter) clear() {
w.syncedSize.Add(int64(w.writer.WrittenMemorySize()))
w.syncedSize.Add(int64(w.writer.GetWrittenUncompressed()))
writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch, w.batchSize)
w.writer = writer
@ -512,6 +523,7 @@ func (w *SegmentWriter) clear() {
w.tsTo = 0
}
// deprecated: use NewMultiSegmentWriter instead
func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, batchSize int, segID, partID, collID int64, Bm25Fields []int64) (*SegmentWriter, error) {
writer, closers, err := newBinlogWriter(collID, partID, segID, sch, batchSize)
if err != nil {
@ -555,7 +567,7 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, batchSize
}
func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema, batchSize int,
) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*storage.Blob, error), err error) {
) (writer *storage.BinlogSerializeWriter, closers []func() (*storage.Blob, error), err error) {
fieldWriters := storage.NewBinlogStreamWriters(collID, partID, segID, schema.Fields)
closers = make([]func() (*storage.Blob, error), 0, len(fieldWriters))
for _, w := range fieldWriters {

View File

@ -197,7 +197,7 @@ func (s *storageV1Serializer) serializeDeltalog(pack *SyncPack) (*storage.Blob,
for i := 0; i < len(pack.deltaData.Pks); i++ {
deleteLog := storage.NewDeleteLog(pack.deltaData.Pks[i], pack.deltaData.Tss[i])
err = writer.Write(deleteLog)
err = writer.WriteValue(deleteLog)
if err != nil {
return nil, err
}

View File

@ -1246,15 +1246,14 @@ func (loader *segmentLoader) loadDeltalogs(ctx context.Context, segment Segment,
}
defer reader.Close()
for {
err := reader.Next()
dl, err := reader.NextValue()
if err != nil {
if err == io.EOF {
break
}
return err
}
dl := reader.Value()
err = deltaData.Append(dl.Pk, dl.Ts)
err = deltaData.Append((*dl).Pk, (*dl).Ts)
if err != nil {
return err
}

View File

@ -138,11 +138,9 @@ func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() {
defer reader.Close()
for i := 1; i <= rows; i++ {
err = reader.Next()
value, err := reader.NextValue()
s.NoError(err)
value := reader.Value()
rec, err := ValueSerializer([]*Value{value}, s.schema.Fields)
rec, err := ValueSerializer([]*Value{*value}, s.schema.Fields)
s.NoError(err)
err = w.Write(rec)
s.NoError(err)
@ -174,6 +172,7 @@ func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() {
}
r, err := NewBinlogRecordReader(s.ctx, binlogs, s.schema, rOption...)
s.NoError(err)
defer r.Close()
for i := 0; i < rows/readBatchSize+1; i++ {
rec, err := r.Next()
s.NoError(err)
@ -304,11 +303,10 @@ func (s *PackedBinlogRecordSuite) TestAllocIDExhausedError() {
defer reader.Close()
for i := 0; i < size; i++ {
err = reader.Next()
value, err := reader.NextValue()
s.NoError(err)
value := reader.Value()
rec, err := ValueSerializer([]*Value{value}, s.schema.Fields)
rec, err := ValueSerializer([]*Value{*value}, s.schema.Fields)
s.NoError(err)
err = w.Write(rec)
s.Error(err)

View File

@ -21,14 +21,12 @@ import (
"io"
"math"
"strconv"
"sync"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/parquet"
"github.com/apache/arrow/go/v17/parquet/compress"
"github.com/apache/arrow/go/v17/parquet/pqarrow"
"github.com/cockroachdb/errors"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -519,7 +517,12 @@ func getFieldWriterProps(field *schemapb.FieldSchema) *parquet.WriterProperties
)
}
type DeserializeReader[T any] struct {
type DeserializeReader[T any] interface {
NextValue() (*T, error)
Close() error
}
type DeserializeReaderImpl[T any] struct {
rr RecordReader
deserializer Deserializer[T]
rec Record
@ -528,11 +531,11 @@ type DeserializeReader[T any] struct {
}
// Iterate to next value, return error or EOF if no more value.
func (deser *DeserializeReader[T]) Next() error {
func (deser *DeserializeReaderImpl[T]) NextValue() (*T, error) {
if deser.rec == nil || deser.pos >= deser.rec.Len()-1 {
r, err := deser.rr.Next()
if err != nil {
return err
return nil, err
}
deser.pos = 0
deser.rec = r
@ -540,33 +543,21 @@ func (deser *DeserializeReader[T]) Next() error {
deser.values = make([]T, deser.rec.Len())
if err := deser.deserializer(deser.rec, deser.values); err != nil {
return err
return nil, err
}
} else {
deser.pos++
}
return nil
return &deser.values[deser.pos], nil
}
func (deser *DeserializeReader[T]) Value() T {
return deser.values[deser.pos]
func (deser *DeserializeReaderImpl[T]) Close() error {
return deser.rr.Close()
}
func (deser *DeserializeReader[T]) Close() error {
if deser.rec != nil {
deser.rec.Release()
}
if deser.rr != nil {
if err := deser.rr.Close(); err != nil {
return err
}
}
return nil
}
func NewDeserializeReader[T any](rr RecordReader, deserializer Deserializer[T]) *DeserializeReader[T] {
return &DeserializeReader[T]{
func NewDeserializeReader[T any](rr RecordReader, deserializer Deserializer[T]) *DeserializeReaderImpl[T] {
return &DeserializeReaderImpl[T]{
rr: rr,
deserializer: deserializer,
}
@ -828,19 +819,22 @@ func newMultiFieldRecordWriter(fieldIds []FieldID, fields []arrow.Field, writer
}, nil
}
type SerializeWriter[T any] struct {
type SerializeWriter[T any] interface {
WriteValue(value T) error
Flush() error
Close() error
}
type SerializeWriterImpl[T any] struct {
rw RecordWriter
serializer Serializer[T]
batchSize int
mu sync.Mutex
buffer []T
pos int
}
func (sw *SerializeWriter[T]) Flush() error {
sw.mu.Lock()
defer sw.mu.Unlock()
func (sw *SerializeWriterImpl[T]) Flush() error {
if sw.pos == 0 {
return nil
}
@ -857,7 +851,7 @@ func (sw *SerializeWriter[T]) Flush() error {
return nil
}
func (sw *SerializeWriter[T]) Write(value T) error {
func (sw *SerializeWriterImpl[T]) WriteValue(value T) error {
if sw.buffer == nil {
sw.buffer = make([]T, sw.batchSize)
}
@ -871,36 +865,15 @@ func (sw *SerializeWriter[T]) Write(value T) error {
return nil
}
func (sw *SerializeWriter[T]) WriteRecord(r Record) error {
sw.mu.Lock()
defer sw.mu.Unlock()
if len(sw.buffer) != 0 {
return errors.New("serialize buffer is not empty")
}
if err := sw.rw.Write(r); err != nil {
return err
}
return nil
}
func (sw *SerializeWriter[T]) WrittenMemorySize() uint64 {
sw.mu.Lock()
defer sw.mu.Unlock()
return sw.rw.GetWrittenUncompressed()
}
func (sw *SerializeWriter[T]) Close() error {
func (sw *SerializeWriterImpl[T]) Close() error {
if err := sw.Flush(); err != nil {
return err
}
sw.rw.Close()
return nil
return sw.rw.Close()
}
func NewSerializeRecordWriter[T any](rw RecordWriter, serializer Serializer[T], batchSize int) *SerializeWriter[T] {
return &SerializeWriter[T]{
func NewSerializeRecordWriter[T any](rw RecordWriter, serializer Serializer[T], batchSize int) *SerializeWriterImpl[T] {
return &SerializeWriterImpl[T]{
rw: rw,
serializer: serializer,
batchSize: batchSize,

View File

@ -34,7 +34,6 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
@ -275,7 +274,7 @@ func ValueDeserializer(r Record, v []*Value, fieldSchema []*schemapb.FieldSchema
return nil
}
func NewBinlogDeserializeReader(schema *schemapb.CollectionSchema, blobsReader ChunkedBlobsReader) (*DeserializeReader[*Value], error) {
func NewBinlogDeserializeReader(schema *schemapb.CollectionSchema, blobsReader ChunkedBlobsReader) (*DeserializeReaderImpl[*Value], error) {
reader, err := newCompositeBinlogRecordReader(schema, blobsReader)
if err != nil {
return nil, err
@ -286,7 +285,7 @@ func NewBinlogDeserializeReader(schema *schemapb.CollectionSchema, blobsReader C
}), nil
}
func newDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
func newDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) {
reader, err := newCompositeBinlogRecordReader(nil, MakeBlobsReader(blobs))
if err != nil {
return nil, err
@ -445,6 +444,9 @@ type BinlogRecordWriter interface {
bm25StatsLog map[FieldID]*datapb.FieldBinlog,
)
GetRowNum() int64
FlushChunk() error
GetBufferUncompressed() uint64
Schema() *schemapb.CollectionSchema
}
type ChunkedBlobsWriter func([]*Blob) error
@ -474,6 +476,8 @@ type CompositeBinlogRecordWriter struct {
fieldBinlogs map[FieldID]*datapb.FieldBinlog
statsLog *datapb.FieldBinlog
bm25StatsLog map[FieldID]*datapb.FieldBinlog
flushedUncompressed uint64
}
var _ BinlogRecordWriter = (*CompositeBinlogRecordWriter)(nil)
@ -527,7 +531,7 @@ func (c *CompositeBinlogRecordWriter) Write(r Record) error {
// flush if size exceeds chunk size
if c.rw.GetWrittenUncompressed() >= c.chunkSize {
return c.flushChunk()
return c.FlushChunk()
}
return nil
@ -565,18 +569,25 @@ func (c *CompositeBinlogRecordWriter) Close() error {
}
if c.rw != nil {
// if rw is not nil, it means there is data to be flushed
if err := c.flushChunk(); err != nil {
if err := c.FlushChunk(); err != nil {
return err
}
}
return nil
}
func (c *CompositeBinlogRecordWriter) GetWrittenUncompressed() uint64 {
return 0
func (c *CompositeBinlogRecordWriter) GetBufferUncompressed() uint64 {
if c.rw == nil {
return 0
}
return c.rw.GetWrittenUncompressed()
}
func (c *CompositeBinlogRecordWriter) flushChunk() error {
func (c *CompositeBinlogRecordWriter) GetWrittenUncompressed() uint64 {
return c.flushedUncompressed + c.GetBufferUncompressed()
}
func (c *CompositeBinlogRecordWriter) FlushChunk() error {
if c.fieldWriters == nil {
return nil
}
@ -621,11 +632,17 @@ func (c *CompositeBinlogRecordWriter) flushChunk() error {
})
}
c.flushedUncompressed += c.rw.GetWrittenUncompressed()
// reset writers
c.resetWriters()
return nil
}
func (c *CompositeBinlogRecordWriter) Schema() *schemapb.CollectionSchema {
return c.schema
}
func (c *CompositeBinlogRecordWriter) writeStats() error {
if c.pkstats == nil {
return nil
@ -731,7 +748,6 @@ func newCompositeBinlogRecordWriter(collectionID, partitionID, segmentID UniqueI
) (*CompositeBinlogRecordWriter, error) {
pkField, err := typeutil.GetPrimaryFieldSchema(schema)
if err != nil {
log.Warn("failed to get pk field from schema")
return nil, err
}
stats, err := NewPrimaryKeyStats(pkField.GetFieldID(), int64(pkField.GetDataType()), maxRowNum)
@ -764,21 +780,39 @@ func newCompositeBinlogRecordWriter(collectionID, partitionID, segmentID UniqueI
}, nil
}
func NewChunkedBinlogSerializeWriter(collectionID, partitionID, segmentID UniqueID, schema *schemapb.CollectionSchema,
blobsWriter ChunkedBlobsWriter, allocator allocator.Interface, chunkSize uint64, rootPath string, maxRowNum int64, batchSize int,
) (*SerializeWriter[*Value], error) {
rw, err := newCompositeBinlogRecordWriter(collectionID, partitionID, segmentID, schema, blobsWriter, allocator, chunkSize, rootPath, maxRowNum)
if err != nil {
return nil, err
// BinlogValueWriter is a BinlogRecordWriter with SerializeWriter[*Value] mixin.
type BinlogValueWriter struct {
BinlogRecordWriter
SerializeWriter[*Value]
}
func (b *BinlogValueWriter) Close() error {
return b.SerializeWriter.Close()
}
func NewBinlogValueWriter(rw BinlogRecordWriter, batchSize int,
) *BinlogValueWriter {
return &BinlogValueWriter{
BinlogRecordWriter: rw,
SerializeWriter: NewSerializeRecordWriter[*Value](rw, func(v []*Value) (Record, error) {
return ValueSerializer(v, rw.Schema().Fields)
}, batchSize),
}
return NewSerializeRecordWriter[*Value](rw, func(v []*Value) (Record, error) {
return ValueSerializer(v, schema.Fields)
}, batchSize), nil
}
// deprecated, use NewBinlogValueWriter instead
type BinlogSerializeWriter struct {
RecordWriter
SerializeWriter[*Value]
}
func (b *BinlogSerializeWriter) Close() error {
return b.SerializeWriter.Close()
}
func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, segmentID UniqueID,
eventWriters map[FieldID]*BinlogStreamWriter, batchSize int,
) (*SerializeWriter[*Value], error) {
) (*BinlogSerializeWriter, error) {
rws := make(map[FieldID]RecordWriter, len(eventWriters))
for fid := range eventWriters {
w := eventWriters[fid]
@ -789,9 +823,12 @@ func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, se
rws[fid] = rw
}
compositeRecordWriter := NewCompositeRecordWriter(rws)
return NewSerializeRecordWriter[*Value](compositeRecordWriter, func(v []*Value) (Record, error) {
return ValueSerializer(v, schema.Fields)
}, batchSize), nil
return &BinlogSerializeWriter{
RecordWriter: compositeRecordWriter,
SerializeWriter: NewSerializeRecordWriter[*Value](compositeRecordWriter, func(v []*Value) (Record, error) {
return ValueSerializer(v, schema.Fields)
}, batchSize),
}, nil
}
type DeltalogStreamWriter struct {
@ -878,7 +915,7 @@ func newDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *Del
}
}
func newDeltalogSerializeWriter(eventWriter *DeltalogStreamWriter, batchSize int) (*SerializeWriter[*DeleteLog], error) {
func newDeltalogSerializeWriter(eventWriter *DeltalogStreamWriter, batchSize int) (*SerializeWriterImpl[*DeleteLog], error) {
rws := make(map[FieldID]RecordWriter, 1)
rw, err := eventWriter.GetRecordWriter()
if err != nil {
@ -1101,7 +1138,7 @@ func (dsw *MultiFieldDeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) err
return nil
}
func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, batchSize int) (*SerializeWriter[*DeleteLog], error) {
func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, batchSize int) (*SerializeWriterImpl[*DeleteLog], error) {
rw, err := eventWriter.GetRecordWriter()
if err != nil {
return nil, err
@ -1155,7 +1192,7 @@ func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, ba
}, batchSize), nil
}
func newDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
func newDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) {
reader, err := newSimpleArrowRecordReader(blobs)
if err != nil {
return nil, err
@ -1198,7 +1235,7 @@ func newDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog],
// NewDeltalogDeserializeReader is the entry point for the delta log reader.
// It includes NewDeltalogOneFieldReader, which uses the existing log format with only one column in a log file,
// and NewDeltalogMultiFieldReader, which uses the new format and supports multiple fields in a log file.
func newDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
func newDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) {
if supportMultiFieldFormat(blobs) {
return newDeltalogMultiFieldReader(blobs)
}
@ -1220,11 +1257,12 @@ func supportMultiFieldFormat(blobs []*Blob) bool {
return false
}
func CreateDeltalogReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
func CreateDeltalogReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) {
return newDeltalogDeserializeReader(blobs)
}
func CreateDeltalogWriter(collectionID, partitionID, segmentID UniqueID, pkType schemapb.DataType, batchSize int) (*SerializeWriter[*DeleteLog], func() (*Blob, error), error) {
func CreateDeltalogWriter(collectionID, partitionID, segmentID UniqueID, pkType schemapb.DataType, batchSize int,
) (*SerializeWriterImpl[*DeleteLog], func() (*Blob, error), error) {
format := paramtable.Get().DataNodeCfg.DeltalogFormat.GetValue()
if format == "json" {
eventWriter := newDeltalogStreamWriter(collectionID, partitionID, segmentID)

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
)
@ -48,7 +49,7 @@ func TestBinlogDeserializeReader(t *testing.T) {
})
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
})
@ -61,14 +62,13 @@ func TestBinlogDeserializeReader(t *testing.T) {
defer reader.Close()
for i := 1; i <= size; i++ {
err = reader.Next()
value, err := reader.NextValue()
assert.NoError(t, err)
value := reader.Value()
assertTestData(t, i, value)
assertTestData(t, i, *value)
}
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
})
@ -81,14 +81,13 @@ func TestBinlogDeserializeReader(t *testing.T) {
defer reader.Close()
for i := 1; i <= size; i++ {
err = reader.Next()
value, err := reader.NextValue()
assert.NoError(t, err)
value := reader.Value()
assertTestAddedFieldData(t, i, value)
assertTestAddedFieldData(t, i, *value)
}
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
})
}
@ -142,13 +141,55 @@ func TestBinlogStreamWriter(t *testing.T) {
}
func TestBinlogSerializeWriter(t *testing.T) {
t.Run("test write value", func(t *testing.T) {
size := 100
blobs, err := generateTestData(size)
assert.NoError(t, err)
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
defer reader.Close()
schema := generateTestSchema()
alloc := allocator.NewLocalAllocator(1, 92) // 90 for 18 fields * 5 chunks, 1 for 1 stats file
chunkSize := uint64(64) // 64B
rw, err := newCompositeBinlogRecordWriter(0, 0, 0, schema,
func(b []*Blob) error {
log.Debug("write blobs", zap.Int("files", len(b)))
return nil
},
alloc, chunkSize, "root", 10000)
assert.NoError(t, err)
writer := NewBinlogValueWriter(rw, 20)
assert.NoError(t, err)
for i := 1; i <= size; i++ {
value, err := reader.NextValue()
assert.NoError(t, err)
assertTestData(t, i, *value)
err = writer.WriteValue(*value)
assert.NoError(t, err)
}
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
err = writer.Close()
assert.NoError(t, err)
logs, _, _ := writer.GetLogs()
assert.Equal(t, 18, len(logs))
assert.Equal(t, 5, len(logs[0].Binlogs))
})
}
func TestBinlogValueWriter(t *testing.T) {
t.Run("test empty data", func(t *testing.T) {
reader, err := NewBinlogDeserializeReader(nil, func() ([]*Blob, error) {
return nil, io.EOF
})
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
})
@ -167,12 +208,11 @@ func TestBinlogSerializeWriter(t *testing.T) {
assert.NoError(t, err)
for i := 1; i <= size; i++ {
err = reader.Next()
value, err := reader.NextValue()
assert.NoError(t, err)
value := reader.Value()
assertTestData(t, i, value)
err := writer.Write(value)
assertTestData(t, i, *value)
err = writer.WriteValue(*value)
assert.NoError(t, err)
}
@ -181,11 +221,11 @@ func TestBinlogSerializeWriter(t *testing.T) {
assert.Equal(t, !f.IsPrimaryKey, props.DictionaryEnabled())
}
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
err = writer.Close()
assert.NoError(t, err)
assert.True(t, writer.WrittenMemorySize() >= 429)
assert.True(t, writer.GetWrittenUncompressed() >= 429)
// Read from the written data
newblobs := make([]*Blob, len(writers))
@ -208,11 +248,10 @@ func TestBinlogSerializeWriter(t *testing.T) {
assert.NoError(t, err)
defer reader.Close()
for i := 1; i <= size; i++ {
err = reader.Next()
value, err := reader.NextValue()
assert.NoError(t, err, i)
value := reader.Value()
assertTestData(t, i, value)
assertTestData(t, i, *value)
}
})
}
@ -245,13 +284,13 @@ func TestSize(t *testing.T) {
},
},
}
err := writer.Write(value)
err := writer.WriteValue(value)
assert.NoError(t, err)
}
err = writer.Close()
assert.NoError(t, err)
memSize := writer.WrittenMemorySize()
memSize := writer.GetWrittenUncompressed()
assert.Greater(t, memSize, uint64(8*4*size)) // written memory size should greater than data size
t.Log("writtern memory size", memSize)
})
@ -283,13 +322,13 @@ func TestSize(t *testing.T) {
},
},
}
err := writer.Write(value)
err := writer.WriteValue(value)
assert.NoError(t, err)
}
err = writer.Close()
assert.NoError(t, err)
memSize := writer.WrittenMemorySize()
memSize := writer.GetWrittenUncompressed()
assert.Greater(t, memSize, uint64(8*4*size)) // written memory size should greater than data size
t.Log("writtern memory size", memSize)
})
@ -358,7 +397,7 @@ func BenchmarkSerializeWriter(b *testing.B) {
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, s)
assert.NoError(b, err)
for _, v := range values {
_ = writer.Write(v)
_ = writer.WriteValue(v)
assert.NoError(b, err)
}
writer.Close()
@ -391,7 +430,7 @@ func TestNull(t *testing.T) {
IsDeleted: false,
Value: m,
}
writer.Write(value)
writer.WriteValue(value)
err = writer.Close()
assert.NoError(t, err)
@ -408,11 +447,10 @@ func TestNull(t *testing.T) {
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()
v, err := reader.NextValue()
assert.NoError(t, err)
readValue := reader.Value()
assert.Equal(t, value, readValue)
assert.Equal(t, value, *v)
})
}
@ -441,7 +479,7 @@ func TestDeltalogDeserializeReader(t *testing.T) {
reader, err := newDeltalogDeserializeReader(nil)
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
})
@ -454,14 +492,13 @@ func TestDeltalogDeserializeReader(t *testing.T) {
defer reader.Close()
for i := 0; i < size; i++ {
err = reader.Next()
value, err := reader.NextValue()
assert.NoError(t, err)
value := reader.Value()
assertTestDeltalogData(t, i, value)
assertTestDeltalogData(t, i, *value)
}
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
})
}
@ -471,7 +508,7 @@ func TestDeltalogSerializeWriter(t *testing.T) {
reader, err := newDeltalogDeserializeReader(nil)
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
})
@ -489,16 +526,15 @@ func TestDeltalogSerializeWriter(t *testing.T) {
assert.NoError(t, err)
for i := 0; i < size; i++ {
err = reader.Next()
value, err := reader.NextValue()
assert.NoError(t, err)
value := reader.Value()
assertTestDeltalogData(t, i, value)
err := writer.Write(value)
assertTestDeltalogData(t, i, *value)
err = writer.WriteValue(*value)
assert.NoError(t, err)
}
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
err = writer.Close()
assert.NoError(t, err)
@ -512,11 +548,10 @@ func TestDeltalogSerializeWriter(t *testing.T) {
assert.NoError(t, err)
defer reader.Close()
for i := 0; i < size; i++ {
err = reader.Next()
value, err := reader.NextValue()
assert.NoError(t, err, i)
value := reader.Value()
assertTestDeltalogData(t, i, value)
assertTestDeltalogData(t, i, *value)
}
})
}
@ -569,13 +604,12 @@ func TestDeltalogPkTsSeparateFormat(t *testing.T) {
assert.NoError(t, err)
defer reader.Close()
for i := 0; i < size; i++ {
err = reader.Next()
value, err := reader.NextValue()
assert.NoError(t, err)
value := reader.Value()
tc.assertPk(t, i, value)
tc.assertPk(t, i, *value)
}
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, io.EOF, err)
})
}
@ -614,7 +648,7 @@ func BenchmarkDeltalogFormatWriter(b *testing.B) {
var value *DeleteLog
for j := 0; j < size; j++ {
value = NewDeleteLog(NewInt64PrimaryKey(int64(j)), uint64(j+1))
writer.Write(value)
writer.WriteValue(value)
}
writer.Close()
eventWriter.Finalize()
@ -646,7 +680,7 @@ func writeDeltalogNewFormat(size int, pkType schemapb.DataType, batchSize int) (
case schemapb.DataType_VarChar:
value = NewDeleteLog(NewVarCharPrimaryKey(strconv.Itoa(i)), uint64(i+1))
}
if err = writer.Write(value); err != nil {
if err = writer.WriteValue(value); err != nil {
return nil, err
}
}
@ -667,8 +701,7 @@ func readDeltaLog(size int, blob *Blob) error {
}
defer reader.Close()
for j := 0; j < size; j++ {
err = reader.Next()
_ = reader.Value()
_, err = reader.NextValue()
if err != nil {
return err
}

View File

@ -117,7 +117,7 @@ func newPackedRecordReader(paths [][]string, schema *schemapb.CollectionSchema,
func NewPackedDeserializeReader(paths [][]string, schema *schemapb.CollectionSchema,
bufferSize int64,
) (*DeserializeReader[*Value], error) {
) (*DeserializeReaderImpl[*Value], error) {
reader, err := newPackedRecordReader(paths, schema, bufferSize)
if err != nil {
return nil, err
@ -260,7 +260,9 @@ func NewPackedRecordWriter(paths []string, schema *arrow.Schema, bufferSize int6
}, nil
}
func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int) (*SerializeWriter[*Value], error) {
func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64,
multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int,
) (*SerializeWriterImpl[*Value], error) {
arrowSchema, err := ConvertToArrowSchema(schema.Fields)
if err != nil {
return nil, merr.WrapErrServiceInternal(
@ -523,6 +525,18 @@ func (pw *PackedBinlogRecordWriter) GetRowNum() int64 {
return pw.rowNum
}
func (pw *PackedBinlogRecordWriter) FlushChunk() error {
return nil // do nothing
}
func (pw *PackedBinlogRecordWriter) Schema() *schemapb.CollectionSchema {
return pw.schema
}
func (pw *PackedBinlogRecordWriter) GetBufferUncompressed() uint64 {
return uint64(pw.multiPartUploadSize)
}
func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID, schema *schemapb.CollectionSchema,
blobsWriter ChunkedBlobsWriter, allocator allocator.Interface, chunkSize uint64, rootPath string, maxRowNum int64, bufferSize, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup,
) (*PackedBinlogRecordWriter, error) {

View File

@ -53,12 +53,11 @@ func TestPackedSerde(t *testing.T) {
assert.NoError(t, err)
for i := 1; i <= size; i++ {
err = reader.Next()
value, err := reader.NextValue()
assert.NoError(t, err)
value := reader.Value()
assertTestData(t, i, value)
err := writer.Write(value)
assertTestData(t, i, *value)
err = writer.WriteValue(*value)
assert.NoError(t, err)
}
err = writer.Close()
@ -76,12 +75,11 @@ func TestPackedSerde(t *testing.T) {
defer reader.Close()
for i := 0; i < size*len(paths); i++ {
err = reader.Next()
value, err := reader.NextValue()
assert.NoError(t, err)
value := reader.Value()
assertTestData(t, i%10+1, value)
assertTestData(t, i%10+1, *value)
}
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(t, err, io.EOF)
})
}

View File

@ -131,11 +131,10 @@ func BenchmarkDeserializeReader(b *testing.B) {
assert.NoError(b, err)
defer reader.Close()
for i := 0; i < len; i++ {
err = reader.Next()
_ = reader.Value()
_, err = reader.NextValue()
assert.NoError(b, err)
}
err = reader.Next()
_, err = reader.NextValue()
assert.Equal(b, io.EOF, err)
}
}

View File

@ -234,11 +234,11 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader,
var pq *PriorityQueue[index]
switch recs[0].Column(pkFieldId).(type) {
case *array.Int64:
pq = NewPriorityQueue[index](func(x, y *index) bool {
pq = NewPriorityQueue(func(x, y *index) bool {
return recs[x.ri].Column(pkFieldId).(*array.Int64).Value(x.i) < recs[y.ri].Column(pkFieldId).(*array.Int64).Value(y.i)
})
case *array.String:
pq = NewPriorityQueue[index](func(x, y *index) bool {
pq = NewPriorityQueue(func(x, y *index) bool {
return recs[x.ri].Column(pkFieldId).(*array.String).Value(x.i) < recs[y.ri].Column(pkFieldId).(*array.String).Value(y.i)
})
}

View File

@ -204,7 +204,7 @@ func (stats *PrimaryKeyStats) UpdateMinMax(pk PrimaryKey) {
func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error) {
if rowNum <= 0 {
return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num", rowNum)
return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num %d", rowNum)
}
bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue()

View File

@ -86,9 +86,13 @@ func (pr *PackedReader) ReadNext() (arrow.Record, error) {
}
func (pr *PackedReader) Close() error {
if pr.cPackedReader == nil {
return nil
}
status := C.CloseReader(pr.cPackedReader)
if err := ConsumeCStatusIntoError(&status); err != nil {
return err
}
pr.cPackedReader = nil
return nil
}

View File

@ -100,7 +100,7 @@ func (r *l0Reader) Read() (*storage.DeleteData, error) {
defer reader.Close()
for {
err := reader.Next()
dl, err := reader.NextValue()
if err != nil {
if err == io.EOF {
break
@ -109,8 +109,7 @@ func (r *l0Reader) Read() (*storage.DeleteData, error) {
return nil, err
}
dl := reader.Value()
deleteData.Append(dl.Pk, dl.Ts)
deleteData.Append((*dl).Pk, (*dl).Ts)
}
r.readIdx++

View File

@ -247,7 +247,6 @@ func (s *ClusteringCompactionNullDataSuite) TestClusteringCompactionNullData() {
s.NoError(err)
s.Equal(segsInfoResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
for _, segInfo := range segsInfoResp.GetInfos() {
s.LessOrEqual(segInfo.GetNumOfRows(), int64(1024*1024/128))
totalRows += segInfo.GetNumOfRows()
}