From c84bdcea499fa47fd5ffba0ae4d23a6cb48ae591 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Mon, 29 May 2023 10:21:28 +0800 Subject: [PATCH] merge stats log when segment flushing or compacting (#23570) Signed-off-by: aoiasd --- internal/datanode/binlog_io.go | 253 +++++---------- internal/datanode/binlog_io_test.go | 300 +++++++----------- internal/datanode/channel_meta.go | 95 +++--- internal/datanode/channel_meta_test.go | 76 ++++- internal/datanode/compactor.go | 108 +++++-- internal/datanode/compactor_test.go | 175 +++++++--- internal/datanode/data_sync_service.go | 10 +- .../datanode/flow_graph_insert_buffer_node.go | 10 +- internal/datanode/flush_manager.go | 160 +++++++--- internal/datanode/flush_manager_test.go | 266 +++++++++++----- internal/datanode/meta_util.go | 12 +- internal/datanode/mock_test.go | 12 + internal/datanode/segment.go | 25 ++ internal/datanode/services.go | 43 +-- internal/datanode/util.go | 7 + internal/indexnode/chunkmgr_mock.go | 2 +- internal/metastore/kv/datacoord/kv_catalog.go | 17 +- internal/querynodev2/segments/mock_data.go | 13 +- .../querynodev2/segments/segment_loader.go | 46 ++- internal/storage/binlog_iterator_test.go | 2 +- internal/storage/data_codec.go | 145 ++++++--- internal/storage/data_codec_test.go | 21 +- internal/storage/print_binlog_test.go | 4 +- internal/storage/stats.go | 152 ++++++--- internal/storage/stats_test.go | 10 +- internal/storage/types.go | 15 + internal/storage/vector_chunk_manager_test.go | 2 +- 27 files changed, 1251 insertions(+), 730 deletions(-) diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index ad5e67fba8..9fde9a4ade 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -54,8 +54,8 @@ type uploader interface { // // errUploadToBlobStorage is returned if ctx is canceled from outside while a uploading is inprogress. // Beware of the ctx here, if no timeout or cancel is applied to this ctx, this uploading may retry forever. - upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error) - uploadInsertLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) + uploadInsertLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, error) + uploadStatsLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, stats *storage.PrimaryKeyStats, totRows int64, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) uploadDeltaLog(ctx context.Context, segID, partID UniqueID, dData *DeleteData, meta *etcdpb.CollectionMeta) ([]*datapb.FieldBinlog, error) } @@ -131,108 +131,6 @@ func (b *binlogIO) uploadSegmentFiles( return nil } -type segPaths struct { - inPaths []*datapb.FieldBinlog - statsPaths []*datapb.FieldBinlog - deltaInfo []*datapb.FieldBinlog -} - -func (b *binlogIO) upload( - ctx context.Context, - segID, partID UniqueID, - iDatas []*InsertData, - dData *DeleteData, - meta *etcdpb.CollectionMeta) (*segPaths, error) { - - var ( - p = &segPaths{} // The returns - kvs = make(map[string][]byte) // Key values to store in minIO - - insertField2Path = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its FieldBinlog - statsField2Path = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its statsBinlog - ) - - for _, iData := range iDatas { - tf, ok := iData.Data[common.TimeStampField] - if !ok || tf.RowNum() == 0 { - log.Warn("binlog io uploading empty insert data", - zap.Int64("segmentID", segID), - zap.Int64("collectionID", meta.GetID()), - ) - continue - } - - blobs, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta) - if err != nil { - log.Warn("generate insert blobs wrong", - zap.Int64("collectionID", meta.GetID()), - zap.Int64("segmentID", segID), - zap.Error(err)) - return nil, err - } - - for k, v := range blobs { - kvs[k] = v - } - - for fID, path := range inpaths { - tmpBinlog, ok := insertField2Path[fID] - if !ok { - tmpBinlog = path - } else { - tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) - } - insertField2Path[fID] = tmpBinlog - } - - for fID, path := range statspaths { - tmpBinlog, ok := statsField2Path[fID] - if !ok { - tmpBinlog = path - } else { - tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) - } - statsField2Path[fID] = tmpBinlog - } - } - - for _, path := range insertField2Path { - p.inPaths = append(p.inPaths, path) - } - - for _, path := range statsField2Path { - p.statsPaths = append(p.statsPaths, path) - } - - // If there are delta binlogs - if dData.RowCount > 0 { - k, v, err := b.genDeltaBlobs(dData, meta.GetID(), partID, segID) - if err != nil { - log.Warn("generate delta blobs wrong", - zap.Int64("collectionID", meta.GetID()), - zap.Int64("segmentID", segID), - zap.Error(err)) - return nil, err - } - - kvs[k] = v - p.deltaInfo = append(p.deltaInfo, &datapb.FieldBinlog{ - FieldID: 0, // TODO: Not useful on deltalogs, FieldID shall be ID of primary key field - Binlogs: []*datapb.Binlog{{ - EntriesNum: dData.RowCount, - LogPath: k, - LogSize: int64(len(v)), - }}, - }) - } - - err := b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs) - if err != nil { - return nil, err - } - return p, nil -} - // genDeltaBlobs returns key, value func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueID) (string, []byte, error) { dCodec := storage.NewDeleteCodec() @@ -253,32 +151,26 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI return key, blob.GetValue(), nil } -// genInsertBlobs returns kvs, insert-paths, stats-paths -func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string][]byte, map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) { - inCodec := storage.NewInsertCodecWithSchema(meta) - inlogs, statslogs, err := inCodec.Serialize(partID, segID, data) +// genInsertBlobs returns insert-paths and save blob to kvs +func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte) (map[UniqueID]*datapb.FieldBinlog, error) { + inlogs, err := iCodec.Serialize(partID, segID, data) if err != nil { - return nil, nil, nil, err + return nil, err } - var ( - kvs = make(map[string][]byte, len(inlogs)+len(statslogs)) - inpaths = make(map[UniqueID]*datapb.FieldBinlog) - statspaths = make(map[UniqueID]*datapb.FieldBinlog) - ) - + inpaths := make(map[UniqueID]*datapb.FieldBinlog) notifyGenIdx := make(chan struct{}) defer close(notifyGenIdx) - generator, err := b.GetGenerator(len(inlogs)+len(statslogs), notifyGenIdx) + generator, err := b.GetGenerator(len(inlogs), notifyGenIdx) if err != nil { - return nil, nil, nil, err + return nil, err } for _, blob := range inlogs { // Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64) - k := metautil.JoinIDPath(meta.GetID(), partID, segID, fID, <-generator) + k := metautil.JoinIDPath(iCodec.Schema.GetID(), partID, segID, fID, <-generator) key := path.Join(b.ChunkManager.RootPath(), common.SegmentInsertLogPath, k) value := blob.GetValue() @@ -291,24 +183,76 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta } } - for _, blob := range statslogs { - // Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt - fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64) + return inpaths, nil +} - k := metautil.JoinIDPath(meta.GetID(), partID, segID, fID, <-generator) - key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k) +// genStatBlobs return stats log paths and save blob to kvs +func (b *binlogIO) genStatBlobs(stats *storage.PrimaryKeyStats, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte, totRows int64) (map[UniqueID]*datapb.FieldBinlog, error) { + statBlob, err := iCodec.SerializePkStats(stats, totRows) + if err != nil { + return nil, err + } + statPaths := make(map[UniqueID]*datapb.FieldBinlog) - value := blob.GetValue() - fileLen := len(value) + idx, err := b.AllocOne() + if err != nil { + return nil, err + } - kvs[key] = value - statspaths[fID] = &datapb.FieldBinlog{ - FieldID: fID, - Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: blob.RowNum}}, + fID, _ := strconv.ParseInt(statBlob.GetKey(), 10, 64) + k := metautil.JoinIDPath(iCodec.Schema.GetID(), partID, segID, fID, idx) + key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k) + + value := statBlob.GetValue() + fileLen := len(value) + + kvs[key] = value + + statPaths[fID] = &datapb.FieldBinlog{ + FieldID: fID, + Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: totRows}}, + } + return statPaths, nil +} + +// update stats log +// also update with insert data if not nil +func (b *binlogIO) uploadStatsLog( + ctx context.Context, + segID UniqueID, + partID UniqueID, + iData *InsertData, + stats *storage.PrimaryKeyStats, + totRows int64, + meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) { + var inPaths map[int64]*datapb.FieldBinlog + var err error + + iCodec := storage.NewInsertCodecWithSchema(meta) + kvs := make(map[string][]byte) + + if !iData.IsEmpty() { + inPaths, err = b.genInsertBlobs(iData, partID, segID, iCodec, kvs) + if err != nil { + log.Warn("generate insert blobs wrong", + zap.Int64("collectionID", iCodec.Schema.GetID()), + zap.Int64("segmentID", segID), + zap.Error(err)) + return nil, nil, err } } - return kvs, inpaths, statspaths, nil + statPaths, err := b.genStatBlobs(stats, partID, segID, iCodec, kvs, totRows) + if err != nil { + return nil, nil, err + } + + err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs) + if err != nil { + return nil, nil, err + } + + return inPaths, statPaths, nil } func (b *binlogIO) uploadInsertLog( @@ -316,55 +260,30 @@ func (b *binlogIO) uploadInsertLog( segID UniqueID, partID UniqueID, iData *InsertData, - meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) { - var ( - insertField2Path = make(map[UniqueID]*datapb.FieldBinlog) - statsField2Path = make(map[UniqueID]*datapb.FieldBinlog) - ) + meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, error) { - tf, ok := iData.Data[common.TimeStampField] - if !ok || tf.RowNum() == 0 { + iCodec := storage.NewInsertCodecWithSchema(meta) + kvs := make(map[string][]byte) + + if iData.IsEmpty() { log.Warn("binlog io uploading empty insert data", zap.Int64("segmentID", segID), - zap.Int64("collectionID", meta.GetID()), + zap.Int64("collectionID", iCodec.Schema.GetID()), ) - return nil, nil, nil + return nil, nil } - kvs, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta) + inpaths, err := b.genInsertBlobs(iData, partID, segID, iCodec, kvs) if err != nil { - log.Warn("generate insert blobs wrong", - zap.Int64("collectionID", meta.GetID()), - zap.Int64("segmentID", segID), - zap.Error(err)) - return nil, nil, err - } - - for fID, path := range inpaths { - tmpBinlog, ok := insertField2Path[fID] - if !ok { - tmpBinlog = path - } else { - tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) - } - insertField2Path[fID] = tmpBinlog - } - - for fID, path := range statspaths { - tmpBinlog, ok := statsField2Path[fID] - if !ok { - tmpBinlog = path - } else { - tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) - } - statsField2Path[fID] = tmpBinlog + return nil, err } err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs) if err != nil { - return nil, nil, err + return nil, err } - return insertField2Path, statsField2Path, nil + + return inpaths, nil } func (b *binlogIO) uploadDeltaLog( diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index 6e6a4c4cb3..346edf69d3 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -30,7 +30,6 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -53,169 +52,6 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - t.Run("Test upload", func(t *testing.T) { - f := &MetaFactory{} - meta := f.GetCollectionMeta(UniqueID(10001), "uploads", schemapb.DataType_Int64) - - //pkFieldID := int64(106) - iData := genInsertData() - pk := newInt64PrimaryKey(888) - dData := &DeleteData{ - RowCount: 1, - Pks: []primaryKey{pk}, - Tss: []uint64{666666}, - } - t.Run("Test upload one iData", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) - alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - - b := &binlogIO{cm, alloc} - p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) - assert.NoError(t, err) - assert.Equal(t, 12, len(p.inPaths)) - assert.Equal(t, 1, len(p.statsPaths)) - assert.Equal(t, 1, len(p.inPaths[0].GetBinlogs())) - assert.Equal(t, 1, len(p.statsPaths[0].GetBinlogs())) - assert.NotNil(t, p.deltaInfo) - }) - - t.Run("Test upload two iData", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) - alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - - b := &binlogIO{cm, alloc} - p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData, iData}, dData, meta) - assert.NoError(t, err) - assert.Equal(t, 12, len(p.inPaths)) - assert.Equal(t, 1, len(p.statsPaths)) - assert.Equal(t, 2, len(p.inPaths[0].GetBinlogs())) - assert.Equal(t, 2, len(p.statsPaths[0].GetBinlogs())) - assert.NotNil(t, p.deltaInfo) - - }) - - t.Run("Test uploadInsertLog", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) - - b := &binlogIO{cm, alloc} - - in, stats, err := b.uploadInsertLog(ctx, 1, 10, iData, meta) - assert.NoError(t, err) - assert.Equal(t, 12, len(in)) - assert.Equal(t, 1, len(in[0].GetBinlogs())) - assert.Equal(t, 1, len(stats)) - }) - t.Run("Test uploadDeltaLog", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - - b := &binlogIO{cm, alloc} - deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta) - assert.NoError(t, err) - assert.NotNil(t, deltas) - assert.Equal(t, 1, len(deltas[0].GetBinlogs())) - }) - - t.Run("Test context Done", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) - alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - - b := &binlogIO{cm, alloc} - - p, err := b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) - assert.EqualError(t, err, errUploadToBlobStorage.Error()) - assert.Nil(t, p) - - in, _, err := b.uploadInsertLog(ctx, 1, 10, iData, meta) - assert.EqualError(t, err, errUploadToBlobStorage.Error()) - assert.Nil(t, in) - - deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta) - assert.EqualError(t, err, errUploadToBlobStorage.Error()) - assert.Nil(t, deltas) - }) - }) - - t.Run("Test upload error", func(t *testing.T) { - f := &MetaFactory{} - meta := f.GetCollectionMeta(UniqueID(10001), "uploads", schemapb.DataType_Int64) - dData := &DeleteData{ - Pks: []primaryKey{}, - Tss: []uint64{}, - } - - t.Run("Test upload empty insertData", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - b := &binlogIO{cm, alloc} - - iData := genEmptyInsertData() - p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) - assert.NoError(t, err) - assert.Empty(t, p.inPaths) - assert.Empty(t, p.statsPaths) - assert.Empty(t, p.deltaInfo) - - iData = &InsertData{Data: make(map[int64]storage.FieldData)} - p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) - assert.NoError(t, err) - assert.Empty(t, p.inPaths) - assert.Empty(t, p.statsPaths) - assert.Empty(t, p.deltaInfo) - }) - - t.Run("Test deleta data not match", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) - b := &binlogIO{cm, alloc} - iData := genInsertData() - dData := &DeleteData{ - Pks: []primaryKey{}, - Tss: []uint64{1}, - RowCount: 1, - } - p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) - assert.Error(t, err) - assert.Empty(t, p) - }) - - t.Run("Test multisave error", func(t *testing.T) { - mkc := &mockCm{errMultiSave: true} - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) - alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - var ( - b = &binlogIO{mkc, alloc} - iData = genInsertData() - pk = newInt64PrimaryKey(1) - dData = &DeleteData{ - Pks: []primaryKey{pk}, - Tss: []uint64{1}, - RowCount: 1, - } - ) - ctx, cancel := context.WithTimeout(context.TODO(), 20*time.Millisecond) - p, err := b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) - assert.Error(t, err) - assert.Empty(t, p) - - in, _, err := b.uploadInsertLog(ctx, 1, 10, iData, meta) - assert.Error(t, err) - assert.Empty(t, in) - - deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta) - assert.Error(t, err) - assert.Empty(t, deltas) - cancel() - }) - }) - t.Run("Test download", func(t *testing.T) { alloc := allocator.NewMockAllocator(t) b := &binlogIO{cm, alloc} @@ -271,6 +107,57 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { assert.Empty(t, blobs) cancel() }) + + t.Run("Test upload stats log err", func(t *testing.T) { + f := &MetaFactory{} + meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64) + + t.Run("gen insert blob failed", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err")) + b := binlogIO{cm, alloc} + _, _, err := b.uploadStatsLog(context.Background(), 1, 10, genInsertData(), genTestStat(meta), 10, meta) + assert.Error(t, err) + }) + }) + + t.Run("Test upload insert log err", func(t *testing.T) { + f := &MetaFactory{} + meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64) + + t.Run("empty insert", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + b := binlogIO{cm, alloc} + + paths, err := b.uploadInsertLog(context.Background(), 1, 10, genEmptyInsertData(), meta) + assert.NoError(t, err) + assert.Nil(t, paths) + }) + + t.Run("gen insert blob failed", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + b := binlogIO{cm, alloc} + + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err")) + + _, err := b.uploadInsertLog(context.Background(), 1, 10, genInsertData(), meta) + assert.Error(t, err) + }) + + t.Run("upload failed", func(t *testing.T) { + mkc := &mockCm{errMultiLoad: true, errMultiSave: true} + alloc := allocator.NewMockAllocator(t) + b := binlogIO{mkc, alloc} + + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + _, err := b.uploadInsertLog(ctx, 1, 10, genInsertData(), meta) + assert.Error(t, err) + }) + }) } func prepareBlob(cm storage.ChunkManager, key string) ([]byte, string, error) { @@ -372,23 +259,18 @@ func TestBinlogIOInnerMethods(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", test.pkType) - helper, err := typeutil.CreateSchemaHelper(meta.Schema) - assert.NoError(t, err) - primaryKeyFieldSchema, err := helper.GetPrimaryKeyField() - assert.NoError(t, err) - primaryKeyFieldID := primaryKeyFieldSchema.GetFieldID() + iCodec := storage.NewInsertCodecWithSchema(meta) - kvs, pin, pstats, err := b.genInsertBlobs(genInsertData(), 10, 1, meta) + kvs := make(map[string][]byte) + pin, err := b.genInsertBlobs(genInsertData(), 10, 1, iCodec, kvs) assert.NoError(t, err) - assert.Equal(t, 1, len(pstats)) assert.Equal(t, 12, len(pin)) - assert.Equal(t, 13, len(kvs)) + assert.Equal(t, 12, len(kvs)) log.Debug("test paths", zap.Any("kvs no.", len(kvs)), - zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath()), - zap.String("stats paths field0", pstats[primaryKeyFieldID].GetBinlogs()[0].GetLogPath())) + zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath())) }) } }) @@ -400,36 +282,88 @@ func TestBinlogIOInnerMethods(t *testing.T) { defer cm.RemoveWithPrefix(ctx, cm.RootPath()) t.Run("serialize error", func(t *testing.T) { + iCodec := storage.NewInsertCodecWithSchema(nil) + bin := &binlogIO{cm, allocator.NewMockAllocator(t)} - kvs, pin, pstats, err := bin.genInsertBlobs(genEmptyInsertData(), 10, 1, nil) + kvs := make(map[string][]byte) + pin, err := bin.genInsertBlobs(genEmptyInsertData(), 10, 1, iCodec, kvs) assert.Error(t, err) assert.Empty(t, kvs) assert.Empty(t, pin) - assert.Empty(t, pstats) }) t.Run("GetGenerator error", func(t *testing.T) { f := &MetaFactory{} meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64) + iCodec := storage.NewInsertCodecWithSchema(meta) alloc := allocator.NewMockAllocator(t) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock GetGenerator error")) bin := &binlogIO{cm, alloc} - kvs, pin, pstats, err := bin.genInsertBlobs(genInsertData(), 10, 1, meta) + kvs := make(map[string][]byte) + + pin, err := bin.genInsertBlobs(genInsertData(), 10, 1, iCodec, kvs) assert.Error(t, err) assert.Empty(t, kvs) assert.Empty(t, pin) - assert.Empty(t, pstats) + }) + }) + + t.Run("Test genStatsBlob", func(t *testing.T) { + f := &MetaFactory{} + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocOne().Return(0, nil) + + b := binlogIO{cm, alloc} + + tests := []struct { + pkType schemapb.DataType + description string + expectError bool + }{ + {schemapb.DataType_Int64, "int64PrimaryField", false}, + {schemapb.DataType_VarChar, "varCharPrimaryField", false}, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_stat_blobs", test.pkType) + iCodec := storage.NewInsertCodecWithSchema(meta) + + kvs := make(map[string][]byte) + stat, err := b.genStatBlobs(genTestStat(meta), 10, 1, iCodec, kvs, 0) + + assert.NoError(t, err) + assert.Equal(t, 1, len(stat)) + assert.Equal(t, 1, len(kvs)) + }) + } + }) + + t.Run("Test genStatsBlob error", func(t *testing.T) { + f := &MetaFactory{} + alloc := allocator.NewMockAllocator(t) + b := binlogIO{cm, alloc} + + t.Run("serialize error", func(t *testing.T) { + meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_stat_blobs_error", schemapb.DataType_Int64) + iCodec := storage.NewInsertCodecWithSchema(meta) + + kvs := make(map[string][]byte) + _, err := b.genStatBlobs(nil, 10, 1, iCodec, kvs, 0) + assert.Error(t, err) }) }) } type mockCm struct { storage.ChunkManager - errMultiLoad bool - errMultiSave bool + errMultiLoad bool + errMultiSave bool + MultiReadReturn [][]byte + ReadReturn []byte } var _ storage.ChunkManager = (*mockCm)(nil) @@ -450,13 +384,17 @@ func (mk *mockCm) MultiWrite(ctx context.Context, contents map[string][]byte) er } func (mk *mockCm) Read(ctx context.Context, filePath string) ([]byte, error) { - return nil, nil + return mk.ReadReturn, nil } func (mk *mockCm) MultiRead(ctx context.Context, filePaths []string) ([][]byte, error) { if mk.errMultiLoad { return nil, errors.New("mockKv multiload error") } + + if mk.MultiReadReturn != nil { + return mk.MultiReadReturn, nil + } return [][]byte{[]byte("a")}, nil } diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index deeca0d91e..25323c9977 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "path" "runtime" "sync" "time" @@ -63,25 +64,28 @@ type Channel interface { getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error) getChannelName(segID UniqueID) string + addSegment(req addSegmentReq) error + getSegment(segID UniqueID) *Segment + removeSegments(segID ...UniqueID) + hasSegment(segID UniqueID, countFlushed bool) bool + + InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error + RollPKstats(segID UniqueID, stats *storage.PrimaryKeyStats) + listAllSegmentIDs() []UniqueID listNotFlushedSegmentIDs() []UniqueID - addSegment(req addSegmentReq) error listPartitionSegments(partID UniqueID) []UniqueID filterSegments(partitionID UniqueID) []*Segment listNewSegmentsStartPositions() []*datapb.SegmentStartPosition transferNewSegments(segmentIDs []UniqueID) updateSegmentPKRange(segID UniqueID, ids storage.FieldData) mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error - hasSegment(segID UniqueID, countFlushed bool) bool - removeSegments(segID ...UniqueID) listCompactedSegmentIDs() map[UniqueID][]UniqueID listSegmentIDsToSync(ts Timestamp) []UniqueID setSegmentLastSyncTs(segID UniqueID, ts Timestamp) updateSegmentRowNumber(segID UniqueID, numRows int64) updateSegmentMemorySize(segID UniqueID, memorySize int64) - InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error - RollPKstats(segID UniqueID, stats []*storage.PrimaryKeyStats) getSegmentStatisticsUpdates(segID UniqueID) (*commonpb.SegmentStats, error) segmentFlushed(segID UniqueID) @@ -130,6 +134,7 @@ type addSegmentReq struct { numOfRows int64 startPos, endPos *msgpb.MsgPosition statsBinLogs []*datapb.FieldBinlog + binLogs []*datapb.FieldBinlog recoverTs Timestamp importing bool } @@ -195,23 +200,6 @@ func (c *ChannelMeta) getChannelName(segID UniqueID) string { return c.channelName } -// maxRowCountPerSegment returns max row count for a segment based on estimation of row size. -func (c *ChannelMeta) maxRowCountPerSegment(ts Timestamp) (int64, error) { - log := log.With(zap.Int64("collectionID", c.collectionID), zap.Uint64("timpstamp", ts)) - schema, err := c.getCollectionSchema(c.collectionID, ts) - if err != nil { - log.Warn("failed to get collection schema", zap.Error(err)) - return 0, err - } - sizePerRecord, err := typeutil.EstimateSizePerRecord(schema) - if err != nil { - log.Warn("failed to estimate size per record", zap.Error(err)) - return 0, err - } - threshold := Params.DataCoordCfg.SegmentMaxSize.GetAsFloat() * 1024 * 1024 - return int64(threshold / float64(sizePerRecord)), nil -} - // addSegment adds the segment to current channel. Segments can be added as *new*, *normal* or *flushed*. // Make sure to verify `channel.hasSegment(segID)` == false before calling `channel.addSegment()`. func (c *ChannelMeta) addSegment(req addSegmentReq) error { @@ -258,6 +246,14 @@ func (c *ChannelMeta) addSegment(req addSegmentReq) error { return nil } +func (c *ChannelMeta) getSegment(segID UniqueID) *Segment { + seg, ok := c.segments[segID] + if !ok { + return nil + } + return seg +} + func (c *ChannelMeta) listCompactedSegmentIDs() map[UniqueID][]UniqueID { c.segMu.RLock() defer c.segMu.RUnlock() @@ -397,13 +393,26 @@ func (c *ChannelMeta) loadStats(ctx context.Context, segmentID int64, collection } // filter stats binlog files which is pk field stats log - var bloomFilterFiles []string + bloomFilterFiles := []string{} + logType := storage.DefaultStatsType + for _, binlog := range statsBinlogs { if binlog.FieldID != pkField { continue } + Loop: for _, log := range binlog.GetBinlogs() { - bloomFilterFiles = append(bloomFilterFiles, log.GetLogPath()) + _, logidx := path.Split(log.GetLogPath()) + // if special status log exist + // only load one file + switch logidx { + case storage.CompoundStatsType.LogIdx(): + bloomFilterFiles = []string{log.GetLogPath()} + logType = storage.CompoundStatsType + break Loop + default: + bloomFilterFiles = append(bloomFilterFiles, log.GetLogPath()) + } } } @@ -424,11 +433,21 @@ func (c *ChannelMeta) loadStats(ctx context.Context, segmentID int64, collection blobs = append(blobs, &Blob{Value: values[i]}) } - stats, err := storage.DeserializeStats(blobs) - if err != nil { - log.Warn("failed to deserialize bloom filter files", zap.Error(err)) - return nil, merr.WrapErrParameterInvalid("valid statslog", "corrupted statslog", err.Error()) + var stats []*storage.PrimaryKeyStats + if logType == storage.CompoundStatsType { + stats, err = storage.DeserializeStatsList(blobs[0]) + if err != nil { + log.Warn("failed to deserialize stats list", zap.Error(err)) + return nil, err + } + } else { + stats, err = storage.DeserializeStats(blobs) + if err != nil { + log.Warn("failed to deserialize stats", zap.Error(err)) + return nil, err + } } + var size uint result := make([]*storage.PkStatistics, 0, len(stats)) for _, stat := range stats { @@ -455,23 +474,27 @@ func (c *ChannelMeta) initPKstats(ctx context.Context, s *Segment, statsBinlogs return nil } -func (c *ChannelMeta) RollPKstats(segID UniqueID, stats []*storage.PrimaryKeyStats) { +func (c *ChannelMeta) RollPKstats(segID UniqueID, stat *storage.PrimaryKeyStats) { + if stat == nil { + log.Warn("sync but no any pk stats", zap.Int64("segmentID", segID)) + return + } + c.segMu.Lock() defer c.segMu.Unlock() seg, ok := c.segments[segID] log.Info("roll pk stats", zap.Int64("segment id", segID)) if ok && seg.notFlushed() { - for _, stat := range stats { - pkStat := &storage.PkStatistics{ - PkFilter: stat.BF, - MinPK: stat.MinPk, - MaxPK: stat.MaxPk, - } - seg.historyStats = append(seg.historyStats, pkStat) + pkStat := &storage.PkStatistics{ + PkFilter: stat.BF, + MinPK: stat.MinPk, + MaxPK: stat.MaxPk, } + seg.historyStats = append(seg.historyStats, pkStat) seg.currentStat = nil return } + // should not happen at all if ok { log.Warn("only growing segment should roll PK stats", zap.Int64("segment", segID), zap.Any("type", seg.sType)) diff --git a/internal/datanode/channel_meta_test.go b/internal/datanode/channel_meta_test.go index b0996c59e0..83eeedb022 100644 --- a/internal/datanode/channel_meta_test.go +++ b/internal/datanode/channel_meta_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "math/rand" + "path" "testing" "time" @@ -38,6 +39,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/metautil" ) var channelMetaNodeTestDir = "/tmp/milvus_test/channel_meta" @@ -700,6 +702,68 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) { } +func TestChannelMeta_loadStats(t *testing.T) { + f := &MetaFactory{} + rc := &RootCoordFactory{ + pkType: schemapb.DataType_Int64, + } + + t.Run("list with merged stats log", func(t *testing.T) { + meta := f.GetCollectionMeta(UniqueID(10001), "test_load_stats", schemapb.DataType_Int64) + // load normal stats log + seg1 := &Segment{ + segmentID: 1, + partitionID: 2, + } + + // load flushed stats log + seg2 := &Segment{ + segmentID: 2, + partitionID: 2, + } + + //gen pk stats bytes + stats := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10) + iCodec := storage.NewInsertCodecWithSchema(meta) + + cm := &mockCm{} + + channel := newChannel("channel", 1, meta.Schema, rc, cm) + channel.segments[seg1.segmentID] = seg1 + channel.segments[seg2.segmentID] = seg2 + + // load normal stats log + blob, err := iCodec.SerializePkStats(stats, 10) + assert.NoError(t, err) + cm.MultiReadReturn = [][]byte{blob.Value} + + _, err = channel.loadStats( + context.Background(), seg1.segmentID, 1, + []*datapb.FieldBinlog{{ + FieldID: 106, + Binlogs: []*datapb.Binlog{{ + /////// + LogPath: path.Join(common.SegmentStatslogPath, metautil.JoinIDPath(1, 2, 1, 106, 10)), + }}}}, 0) + assert.NoError(t, err) + + // load flushed stats log + blob, err = iCodec.SerializePkStatsList([]*storage.PrimaryKeyStats{stats}, 10) + assert.NoError(t, err) + cm.MultiReadReturn = [][]byte{blob.Value} + + _, err = channel.loadStats( + context.Background(), seg2.segmentID, 1, + []*datapb.FieldBinlog{{ + FieldID: 106, + Binlogs: []*datapb.Binlog{{ + /////// + LogPath: path.Join(common.SegmentStatslogPath, metautil.JoinIDPath(1, 2, 2, 106), storage.CompoundStatsType.LogIdx()), + }}}}, 0) + assert.NoError(t, err) + }) +} + func TestChannelMeta_UpdatePKRange(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -978,18 +1042,6 @@ func (s *ChannelMetaSuite) TestHasSegment() { } } -func (s *ChannelMetaSuite) getSegmentByID(id UniqueID) (*Segment, bool) { - s.channel.segMu.RLock() - defer s.channel.segMu.RUnlock() - - seg, ok := s.channel.segments[id] - if ok && seg.isValid() { - return seg, true - } - - return nil, false -} - func TestChannelMetaSuite(t *testing.T) { suite.Run(t, new(ChannelMetaSuite)) } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 8a88e36aa6..2f2e124c11 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -27,6 +27,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/tsoutil" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -136,6 +137,20 @@ func (t *compactionTask) getChannelName() string { return t.plan.GetChannel() } +// return num rows of all segment compaction from +func (t *compactionTask) getNumRows() (int64, error) { + numRows := int64(0) + for _, binlog := range t.plan.SegmentBinlogs { + seg := t.Channel.getSegment(binlog.GetSegmentID()) + if seg == nil { + return 0, merr.WrapErrSegmentNotFound(binlog.GetSegmentID(), "get compaction segments num rows failed") + } + numRows += seg.numRows + } + + return numRows, nil +} + func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) ( map[interface{}]Timestamp, *DelDataBuf, error) { log := log.With(zap.Int64("planID", t.getPlanID())) @@ -196,13 +211,51 @@ func nano2Milli(nano time.Duration) float64 { return float64(nano) / float64(time.Millisecond) } +func (t *compactionTask) uploadRemainLog( + ctxTimeout context.Context, + targetSegID UniqueID, + partID UniqueID, + meta *etcdpb.CollectionMeta, + stats *storage.PrimaryKeyStats, + totRows int64, + fID2Content map[UniqueID][]interface{}, + fID2Type map[UniqueID]schemapb.DataType) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) { + var iData *InsertData + + // remain insert data + if len(fID2Content) != 0 { + iData = &InsertData{Data: make(map[storage.FieldID]storage.FieldData)} + for fID, content := range fID2Content { + tp, ok := fID2Type[fID] + if !ok { + log.Warn("no field ID in this schema", zap.Int64("fieldID", fID)) + return nil, nil, errors.New("Unexpected error") + } + + fData, err := interface2FieldData(tp, content, int64(len(content))) + if err != nil { + log.Warn("transfer interface to FieldData wrong", zap.Error(err)) + return nil, nil, err + } + iData.Data[fID] = fData + } + } + + inPaths, statPaths, err := t.uploadStatsLog(ctxTimeout, targetSegID, partID, iData, stats, totRows, meta) + if err != nil { + return nil, nil, err + } + + return inPaths, statPaths, nil +} + func (t *compactionTask) uploadSingleInsertLog( ctxTimeout context.Context, targetSegID UniqueID, partID UniqueID, meta *etcdpb.CollectionMeta, fID2Content map[UniqueID][]interface{}, - fID2Type map[UniqueID]schemapb.DataType) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) { + fID2Type map[UniqueID]schemapb.DataType) (map[UniqueID]*datapb.FieldBinlog, error) { iData := &InsertData{ Data: make(map[storage.FieldID]storage.FieldData)} @@ -210,23 +263,23 @@ func (t *compactionTask) uploadSingleInsertLog( tp, ok := fID2Type[fID] if !ok { log.Warn("no field ID in this schema", zap.Int64("fieldID", fID)) - return nil, nil, errors.New("Unexpected error") + return nil, errors.New("Unexpected error") } fData, err := interface2FieldData(tp, content, int64(len(content))) if err != nil { log.Warn("transfer interface to FieldData wrong", zap.Error(err)) - return nil, nil, err + return nil, err } iData.Data[fID] = fData } - inPaths, statPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, iData, meta) + inPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, iData, meta) if err != nil { - return nil, nil, err + return nil, err } - return inPaths, statPaths, nil + return inPaths, nil } func (t *compactionTask) merge( @@ -245,10 +298,6 @@ func (t *compactionTask) merge( numRows int64 // the number of rows uploaded expired int64 // the number of expired entity - // statslog generation - pkID UniqueID - pkType schemapb.DataType - fID2Type = make(map[UniqueID]schemapb.DataType) fID2Content = make(map[UniqueID][]interface{}) @@ -295,14 +344,22 @@ func (t *compactionTask) merge( } // get pkID, pkType, dim + var pkField *schemapb.FieldSchema for _, fs := range meta.GetSchema().GetFields() { fID2Type[fs.GetFieldID()] = fs.GetDataType() if fs.GetIsPrimaryKey() && fs.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(fs.GetDataType()) { - pkID = fs.GetFieldID() - pkType = fs.GetDataType() + pkField = fs } } + if pkField == nil { + log.Warn("failed to get pk filed from schema") + return nil, nil, 0, fmt.Errorf("no pk field in schema") + } + + pkID := pkField.GetFieldID() + pkType := pkField.GetDataType() + // estimate Rows per binlog // TODO should not convert size to row because we already know the size, this is especially important on varchar types. size, err := typeutil.EstimateSizePerRecord(meta.GetSchema()) @@ -324,6 +381,13 @@ func (t *compactionTask) merge( downloadTimeCost := time.Duration(0) uploadInsertTimeCost := time.Duration(0) + oldRowNums, err := t.getNumRows() + if err != nil { + return nil, nil, 0, err + } + + stats := storage.NewPrimaryKeyStats(pkID, int64(pkType), oldRowNums) + for _, path := range unMergedInsertlogs { downloadStart := time.Now() data, err := t.download(ctxTimeout, path) @@ -369,18 +433,19 @@ func (t *compactionTask) merge( } fID2Content[fID] = append(fID2Content[fID], vInter) } + //update pk to new stats log + stats.Update(v.PK) currentRows++ if currentRows >= maxRowsPerBinlog { uploadInsertStart := time.Now() - inPaths, statsPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type) + inPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type) if err != nil { log.Warn("failed to upload single insert log", zap.Error(err)) return nil, nil, 0, err } uploadInsertTimeCost += time.Since(uploadInsertStart) addInsertFieldPath(inPaths) - addStatFieldPath(statsPaths) fID2Content = make(map[int64][]interface{}) currentRows = 0 @@ -389,20 +454,21 @@ func (t *compactionTask) merge( } } } - if currentRows != 0 { - uploadInsertStart := time.Now() - inPaths, statsPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type) + + // upload stats log and remain insert rows + if numRows != 0 || currentRows != 0 { + uploadStart := time.Now() + inPaths, statsPaths, err := t.uploadRemainLog(ctxTimeout, targetSegID, partID, meta, + stats, numRows+int64(currentRows), fID2Content, fID2Type) if err != nil { - log.Warn("failed to upload single insert log", zap.Error(err)) return nil, nil, 0, err } - uploadInsertTimeCost += time.Since(uploadInsertStart) + uploadInsertTimeCost += time.Since(uploadStart) addInsertFieldPath(inPaths) addStatFieldPath(statsPaths) - numRows += int64(currentRows) - numBinlogs++ + numBinlogs += len(inPaths) } for _, path := range insertField2Path { diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 05f2c9dde3..1e89747da0 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -280,8 +281,11 @@ func TestCompactionTaskInnerMethods(t *testing.T) { Schema: meta.GetSchema(), }, nil) channel := newChannel("a", collectionID, meta.GetSchema(), rc, nil) + channel.segments[1] = &Segment{numRows: 10} + alloc := allocator.NewMockAllocator(t) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) + alloc.EXPECT().AllocOne().Return(0, nil) t.Run("Merge without expiration", func(t *testing.T) { mockbIO := &binlogIO{cm, alloc} @@ -289,7 +293,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { iData := genInsertDataWithExpiredTS() var allPaths [][]string - inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -307,7 +311,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) { 1: 10000, } - ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)} + ct := &compactionTask{ + Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1), + plan: &datapb.CompactionPlan{ + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 1}}, + }} inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm) assert.NoError(t, err) assert.Equal(t, int64(2), numOfRow) @@ -326,7 +335,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -342,13 +351,18 @@ func TestCompactionTaskInnerMethods(t *testing.T) { dm := map[interface{}]Timestamp{} - ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)} + ct := &compactionTask{ + Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1), + plan: &datapb.CompactionPlan{ + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 1}}, + }} inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm) assert.NoError(t, err) assert.Equal(t, int64(2), numOfRow) assert.Equal(t, 2, len(inPaths[0].GetBinlogs())) assert.Equal(t, 1, len(statsPaths)) - assert.Equal(t, 2, len(statsPaths[0].GetBinlogs())) + assert.Equal(t, 1, len(statsPaths[0].GetBinlogs())) }) t.Run("Merge with expiration", func(t *testing.T) { @@ -358,7 +372,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -383,6 +397,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) { uploader: mockbIO, plan: &datapb.CompactionPlan{ CollectionTtl: 864000, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 1}}, }, done: make(chan struct{}, 1), } @@ -400,7 +416,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -418,7 +434,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) { 1: 10000, } - ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)} + ct := &compactionTask{ + Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1), + plan: &datapb.CompactionPlan{ + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 1}}, + }} _, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{ Schema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ {DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ @@ -436,7 +457,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -537,6 +558,81 @@ func TestCompactionTaskInnerMethods(t *testing.T) { assert.Equal(t, false, res) }) }) + + t.Run("Test getNumRows error", func(t *testing.T) { + rc := &RootCoordFactory{ + pkType: schemapb.DataType_Int64, + } + cm := &mockCm{} + + ct := &compactionTask{ + Channel: newChannel("channel", 1, nil, rc, cm), + plan: &datapb.CompactionPlan{ + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 1, + }}, + }, + done: make(chan struct{}, 1), + } + + //segment not in channel + _, err := ct.getNumRows() + assert.Error(t, err) + }) + + t.Run("Test uploadRemainLog error", func(t *testing.T) { + f := &MetaFactory{} + + t.Run("field not in field to type", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + ct := &compactionTask{ + done: make(chan struct{}, 1), + } + meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64) + fid2C := make(map[int64][]interface{}) + fid2T := make(map[int64]schemapb.DataType) + fid2C[1] = nil + _, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, nil, 0, fid2C, fid2T) + assert.Error(t, err) + }) + + t.Run("transfer interface wrong", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + ct := &compactionTask{ + done: make(chan struct{}, 1), + } + meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64) + fid2C := make(map[int64][]interface{}) + fid2T := make(map[int64]schemapb.DataType) + fid2C[1] = nil + _, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, nil, 0, fid2C, fid2T) + assert.Error(t, err) + }) + + t.Run("upload failed", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) + + meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64) + stats := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10) + + ct := &compactionTask{ + uploader: &binlogIO{&mockCm{errMultiSave: true}, alloc}, + done: make(chan struct{}, 1), + } + + _, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil, nil) + assert.Error(t, err) + }) + }) } func getInt64DeltaBlobs(segID UniqueID, pks []UniqueID, tss []Timestamp) ([]*Blob, error) { @@ -555,13 +651,6 @@ func getInt64DeltaBlobs(segID UniqueID, pks []UniqueID, tss []Timestamp) ([]*Blo return []*Blob{blob}, err } -func getInsertBlobs(segID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) ([]*Blob, error) { - iCodec := storage.NewInsertCodecWithSchema(meta) - - iblobs, _, err := iCodec.Serialize(10, segID, iData) - return iblobs, err -} - func TestCompactorInterfaceMethods(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -677,28 +766,34 @@ func TestCompactorInterfaceMethods(t *testing.T) { RowCount: 1, } - cpaths1, err := mockbIO.upload(context.TODO(), c.segID1, c.parID, []*InsertData{iData1}, dData1, meta) + stats1 := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1) + iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), c.segID1, c.parID, iData1, stats1, 2, meta) require.NoError(t, err) - require.Equal(t, 12, len(cpaths1.inPaths)) + dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID1, c.parID, dData1, meta) + require.NoError(t, err) + require.Equal(t, 12, len(iPaths1)) - cpaths2, err := mockbIO.upload(context.TODO(), c.segID2, c.parID, []*InsertData{iData2}, dData2, meta) + stats2 := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1) + iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), c.segID2, c.parID, iData2, stats2, 2, meta) require.NoError(t, err) - require.Equal(t, 12, len(cpaths2.inPaths)) + dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID2, c.parID, dData2, meta) + require.NoError(t, err) + require.Equal(t, 12, len(iPaths2)) plan := &datapb.CompactionPlan{ PlanID: 10080, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ { SegmentID: c.segID1, - FieldBinlogs: cpaths1.inPaths, - Field2StatslogPaths: cpaths1.statsPaths, - Deltalogs: cpaths1.deltaInfo, + FieldBinlogs: lo.Values(iPaths1), + Field2StatslogPaths: lo.Values(sPaths1), + Deltalogs: dPaths1, }, { SegmentID: c.segID2, - FieldBinlogs: cpaths2.inPaths, - Field2StatslogPaths: cpaths2.statsPaths, - Deltalogs: cpaths2.deltaInfo, + FieldBinlogs: lo.Values(iPaths2), + Field2StatslogPaths: lo.Values(sPaths2), + Deltalogs: dPaths2, }, }, StartTime: 0, @@ -814,28 +909,34 @@ func TestCompactorInterfaceMethods(t *testing.T) { RowCount: 0, } - cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta) + stats1 := storage.NewPrimaryKeyStats(1, int64(rc.pkType), 1) + iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), segID1, partID, iData1, stats1, 1, meta) require.NoError(t, err) - require.Equal(t, 12, len(cpaths1.inPaths)) + dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), segID1, partID, dData1, meta) + require.NoError(t, err) + require.Equal(t, 12, len(iPaths1)) - cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, dData2, meta) + stats2 := storage.NewPrimaryKeyStats(1, int64(rc.pkType), 1) + iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), segID2, partID, iData2, stats2, 1, meta) require.NoError(t, err) - require.Equal(t, 12, len(cpaths2.inPaths)) + dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), segID2, partID, dData2, meta) + require.NoError(t, err) + require.Equal(t, 12, len(iPaths2)) plan := &datapb.CompactionPlan{ PlanID: 20080, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ { SegmentID: segID1, - FieldBinlogs: cpaths1.inPaths, - Field2StatslogPaths: cpaths1.statsPaths, - Deltalogs: cpaths1.deltaInfo, + FieldBinlogs: lo.Values(iPaths1), + Field2StatslogPaths: lo.Values(sPaths1), + Deltalogs: dPaths1, }, { SegmentID: segID2, - FieldBinlogs: cpaths2.inPaths, - Field2StatslogPaths: cpaths2.statsPaths, - Deltalogs: cpaths2.deltaInfo, + FieldBinlogs: lo.Values(iPaths2), + Field2StatslogPaths: lo.Values(sPaths2), + Deltalogs: dPaths2, }, }, StartTime: 0, @@ -876,7 +977,7 @@ type mockFlushManager struct { var _ flushManager = (*mockFlushManager)(nil) -func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) ([]*Blob, error) { +func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error) { if mfm.returnError { return nil, fmt.Errorf("mock error") } diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 419ebadfd9..b5527359a0 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -217,7 +217,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick continue } - log.Info("recover growing segments form checkpoints", + log.Info("recover growing segments from checkpoints", zap.String("vChannelName", us.GetInsertChannel()), zap.Int64("segmentID", us.GetID()), zap.Int64("numRows", us.GetNumOfRows()), @@ -233,6 +233,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick partitionID: segment.PartitionID, numOfRows: segment.GetNumOfRows(), statsBinLogs: segment.Statslogs, + binLogs: segment.GetBinlogs(), endPos: segment.GetDmlPosition(), recoverTs: vchanInfo.GetSeekPosition().GetTimestamp()}); err != nil { return nil, err @@ -265,10 +266,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick if err := dsService.channel.addSegment(addSegmentReq{ segType: datapb.SegmentType_Flushed, segID: segment.GetID(), - collID: segment.CollectionID, - partitionID: segment.PartitionID, + collID: segment.GetCollectionID(), + partitionID: segment.GetPartitionID(), numOfRows: segment.GetNumOfRows(), - statsBinLogs: segment.Statslogs, + statsBinLogs: segment.GetStatslogs(), + binLogs: segment.GetBinlogs(), recoverTs: vchanInfo.GetSeekPosition().GetTimestamp(), }); err != nil { return nil, err diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 774447a8c9..d58af65297 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -470,10 +470,11 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, zap.String("channel", ibNode.channelName), ) // use the flushed pk stats to take current stat - var pkStats []*storage.PrimaryKeyStats + var pkStats *storage.PrimaryKeyStats // TODO, this has to be async flush, no need to block here. err := retry.Do(ibNode.ctx, func() error { - statBlobs, err := ibNode.flushManager.flushBufferData(task.buffer, + var err error + pkStats, err = ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, task.dropped, @@ -481,11 +482,6 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, if err != nil { return err } - pkStats, err = storage.DeserializeStats(statBlobs) - if err != nil { - log.Warn("failed to deserialize bloom filter files", zap.Error(err)) - return err - } return nil }, getFlowGraphRetryOpt()) if err != nil { diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 5b40158e48..c155feb6e2 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" @@ -47,7 +48,7 @@ import ( // flushManager defines a flush manager signature type flushManager interface { // notify flush manager insert buffer data - flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) ([]*Blob, error) + flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error) // notify flush manager del buffer data flushDelData(data *DelDataBuf, segmentID UniqueID, pos *msgpb.MsgPosition) error // injectFlush injects compaction or other blocking task before flush sync @@ -340,56 +341,129 @@ func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flush m.getFlushQueue(segmentID).enqueueDelFlush(task, deltaLogs, pos) } -// flushBufferData notifies flush manager insert buffer data. -// This method will be retired on errors. Final errors will be propagated upstream and logged. -func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) ([]*Blob, error) { - tr := timerecord.NewTimeRecorder("flushDuration") - // empty flush +func (m *rendezvousFlushManager) serializeBinLog(segmentID, partID int64, data *BufferData, inCodec *storage.InsertCodec) ([]*Blob, map[int64]int, error) { + fieldMemorySize := make(map[int64]int) + if data == nil || data.buffer == nil { - //m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{}, - // map[UniqueID]string{}, map[UniqueID]string{}, flushed, dropped, pos) - m.handleInsertTask(segmentID, &flushBufferInsertTask{}, map[UniqueID]*datapb.Binlog{}, map[UniqueID]*datapb.Binlog{}, - flushed, dropped, pos) - return nil, nil + return []*Blob{}, fieldMemorySize, nil } - collID, partID, meta, err := m.getSegmentMeta(segmentID, pos) - if err != nil { - return nil, err - } // get memory size of buffer data - fieldMemorySize := make(map[int64]int) for fieldID, fieldData := range data.buffer.Data { fieldMemorySize[fieldID] = fieldData.GetMemorySize() } // encode data and convert output data - inCodec := storage.NewInsertCodecWithSchema(meta) + blobs, err := inCodec.Serialize(partID, segmentID, data.buffer) + if err != nil { + return nil, nil, err + } + return blobs, fieldMemorySize, nil +} - binLogs, statsBinlogs, err := inCodec.Serialize(partID, segmentID, data.buffer) +func (m *rendezvousFlushManager) serializePkStatsLog(segmentID int64, flushed bool, data *BufferData, inCodec *storage.InsertCodec) (*Blob, *storage.PrimaryKeyStats, error) { + var err error + var stats *storage.PrimaryKeyStats + + pkField := getPKField(inCodec.Schema) + if pkField == nil { + log.Error("No pk filed in schema", zap.Int64("segmentID", segmentID), zap.Int64("collectionID", inCodec.Schema.GetID())) + return nil, nil, fmt.Errorf("no primary key in meta") + } + + var insertData storage.FieldData + rowNum := int64(0) + if data != nil && data.buffer != nil { + insertData = data.buffer.Data[pkField.FieldID] + rowNum = int64(insertData.RowNum()) + if insertData.RowNum() > 0 { + // gen stats of buffer insert data + stats = storage.NewPrimaryKeyStats(pkField.FieldID, int64(pkField.DataType), rowNum) + stats.UpdateByMsgs(insertData) + } + } + + // get all stats log as a list, serialize to blob + // if flushed + if flushed { + seg := m.getSegment(segmentID) + if seg == nil { + return nil, nil, merr.WrapErrSegmentNotFound(segmentID) + } + + statsList, oldRowNum := seg.getHistoricalStats(pkField) + if stats != nil { + statsList = append(statsList, stats) + } + + blob, err := inCodec.SerializePkStatsList(statsList, oldRowNum+rowNum) + if err != nil { + return nil, nil, err + } + return blob, stats, nil + } + + if rowNum == 0 { + return nil, nil, nil + } + + // only serialize stats gen from new insert data + // if not flush + blob, err := inCodec.SerializePkStats(stats, rowNum) + if err != nil { + return nil, nil, err + } + + return blob, stats, nil +} + +// flushBufferData notifies flush manager insert buffer data. +// This method will be retired on errors. Final errors will be propagated upstream and logged. +func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error) { + field2Insert := make(map[UniqueID]*datapb.Binlog) + field2Stats := make(map[UniqueID]*datapb.Binlog) + kvs := make(map[string][]byte) + + tr := timerecord.NewTimeRecorder("flushDuration") + // get segment info + collID, partID, meta, err := m.getSegmentMeta(segmentID, pos) if err != nil { return nil, err } + inCodec := storage.NewInsertCodecWithSchema(meta) + // build bin log blob + binLogBlobs, fieldMemorySize, err := m.serializeBinLog(segmentID, partID, data, inCodec) + if err != nil { + return nil, err + } + + // build stats log blob + pkStatsBlob, stats, err := m.serializePkStatsLog(segmentID, flushed, data, inCodec) + if err != nil { + return nil, err + } + + // allocate + // alloc for stats log if have new stats log and not flushing + var logidx int64 + allocNum := uint32(len(binLogBlobs) + boolToInt(!flushed && pkStatsBlob != nil)) + if allocNum != 0 { + logidx, _, err = m.Alloc(allocNum) + if err != nil { + return nil, err + } + } // binlogs - start, _, err := m.Alloc(uint32(len(binLogs) + len(statsBinlogs))) - if err != nil { - return nil, err - } - - field2Insert := make(map[UniqueID]*datapb.Binlog, len(binLogs)) - kvs := make(map[string][]byte, len(binLogs)) - for idx, blob := range binLogs { + for _, blob := range binLogBlobs { + defer func() { logidx++ }() fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) if err != nil { log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) return nil, err } - logidx := start + int64(idx) - k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx) - // [rootPath]/[insert_log]/key key := path.Join(m.ChunkManager.RootPath(), common.SegmentInsertLogPath, k) kvs[key] = blob.Value[:] @@ -402,27 +476,32 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni } } - field2Stats := make(map[UniqueID]*datapb.Binlog) - // write stats binlog - for idx, blob := range statsBinlogs { - fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) + // pk stats binlog + if pkStatsBlob != nil { + fieldID, err := strconv.ParseInt(pkStatsBlob.GetKey(), 10, 64) if err != nil { log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) return nil, err } - logidx := start + UniqueID(len(binLogs)+idx) + // use storage.FlushedStatsLogIdx as logidx if flushed + // else use last idx we allocated + var key string + if flushed { + k := metautil.JoinIDPath(collID, partID, segmentID, fieldID) + key = path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k, storage.CompoundStatsType.LogIdx()) + } else { + k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx) + key = path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k) + } - k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx) - - key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k) - kvs[key] = blob.Value + kvs[key] = pkStatsBlob.Value field2Stats[fieldID] = &datapb.Binlog{ EntriesNum: 0, TimestampFrom: 0, //TODO TimestampTo: 0, //TODO, LogPath: key, - LogSize: int64(len(blob.Value)), + LogSize: int64(len(pkStatsBlob.Value)), } } @@ -432,7 +511,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni }, field2Insert, field2Stats, flushed, dropped, pos) metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds())) - return statsBinlogs, nil + return stats, nil } // notify flush manager del buffer data @@ -866,5 +945,6 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet dsService.flushingSegCache.Remove(req.GetSegmentID()) dsService.channel.evictHistoryInsertBuffer(req.GetSegmentID(), pack.pos) dsService.channel.evictHistoryDeleteBuffer(req.GetSegmentID(), pack.pos) + // dsService.channel.saveBinlogPath(fieldStats) } } diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index a1763c5c5d..9391571709 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -145,7 +145,15 @@ func TestOrderFlushQueue_Order(t *testing.T) { func newTestChannel() *ChannelMeta { return &ChannelMeta{ - segments: make(map[UniqueID]*Segment), + segments: make(map[UniqueID]*Segment), + collectionID: 1, + collSchema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{{ + FieldID: 100, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }}, + }, } } @@ -157,12 +165,26 @@ func TestRendezvousFlushManager(t *testing.T) { size := 1000 var counter atomic.Int64 - finish := sync.WaitGroup{} - finish.Add(size) alloc := allocator.NewMockAllocator(t) - m := NewRendezvousFlushManager(alloc, cm, newTestChannel(), func(pack *segmentFlushPack) { + channel := newTestChannel() + hisotricalStats := storage.NewPrimaryKeyStats(100, 5, 10) + testSeg := &Segment{ + collectionID: 1, + segmentID: 3, + // flush segment will merge all historial stats + historyStats: []*storage.PkStatistics{ + { + PkFilter: hisotricalStats.BF, + MinPK: hisotricalStats.MinPk, + MaxPK: hisotricalStats.MaxPk, + }, + }, + } + testSeg.setType(datapb.SegmentType_New) + channel.segments[testSeg.segmentID] = testSeg + + m := NewRendezvousFlushManager(alloc, cm, channel, func(pack *segmentFlushPack) { counter.Inc() - finish.Done() }, emptyFlushAndDropFunc) ids := make([][]byte, 0, size) @@ -172,21 +194,18 @@ func TestRendezvousFlushManager(t *testing.T) { ids = append(ids, id) } - wg := sync.WaitGroup{} - wg.Add(size) for i := 0; i < size; i++ { - m.flushDelData(nil, 1, &msgpb.MsgPosition{ + err := m.flushDelData(nil, testSeg.segmentID, &msgpb.MsgPosition{ MsgID: ids[i], }) - m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{ - MsgID: ids[i], - }) - wg.Done() - } - wg.Wait() - finish.Wait() + assert.NoError(t, err) - assert.EqualValues(t, size, counter.Load()) + _, err = m.flushBufferData(nil, testSeg.segmentID, true, false, &msgpb.MsgPosition{ + MsgID: ids[i], + }) + assert.NoError(t, err) + } + assert.Eventually(t, func() bool { return counter.Load() == int64(size) }, 3*time.Second, 100*time.Millisecond) } func TestRendezvousFlushManager_Inject(t *testing.T) { @@ -197,17 +216,32 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { size := 1000 var counter atomic.Int64 - finish := sync.WaitGroup{} - finish.Add(size) var packMut sync.Mutex packs := make([]*segmentFlushPack, 0, size+3) alloc := allocator.NewMockAllocator(t) - m := NewRendezvousFlushManager(alloc, cm, newTestChannel(), func(pack *segmentFlushPack) { + + channel := newTestChannel() + segments := []*Segment{{ + collectionID: 1, + segmentID: 1, + }, { + collectionID: 1, + segmentID: 2, + }, { + collectionID: 1, + segmentID: 3, + }} + + for _, seg := range segments { + seg.setType(datapb.SegmentType_New) + channel.segments[seg.segmentID] = seg + } + + m := NewRendezvousFlushManager(alloc, cm, channel, func(pack *segmentFlushPack) { packMut.Lock() packs = append(packs, pack) packMut.Unlock() counter.Inc() - finish.Done() }, emptyFlushAndDropFunc) ti := newTaskInjection(1, func(*segmentFlushPack) {}) @@ -222,58 +256,61 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { ids = append(ids, id) } - wg := sync.WaitGroup{} - wg.Add(size) for i := 0; i < size; i++ { - m.flushDelData(nil, 1, &msgpb.MsgPosition{ + err := m.flushDelData(nil, 1, &msgpb.MsgPosition{ MsgID: ids[i], }) - m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{ + assert.NoError(t, err) + + _, err = m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{ MsgID: ids[i], }) - wg.Done() + assert.NoError(t, err) } - wg.Wait() - finish.Wait() + assert.Eventually(t, func() bool { return counter.Load() == int64(size) }, 3*time.Second, 100*time.Millisecond) - assert.EqualValues(t, size, counter.Load()) - - finish.Add(2) id := make([]byte, 10) rand.Read(id) id2 := make([]byte, 10) rand.Read(id2) - m.flushBufferData(nil, 2, true, false, &msgpb.MsgPosition{ + _, err := m.flushBufferData(nil, 2, true, false, &msgpb.MsgPosition{ MsgID: id, }) - m.flushBufferData(nil, 3, true, false, &msgpb.MsgPosition{ + + assert.NoError(t, err) + _, err = m.flushBufferData(nil, 3, true, false, &msgpb.MsgPosition{ MsgID: id2, }) + assert.NoError(t, err) ti = newTaskInjection(2, func(pack *segmentFlushPack) { pack.segmentID = 4 }) m.injectFlush(ti, 2, 3) - m.flushDelData(nil, 2, &msgpb.MsgPosition{ + err = m.flushDelData(nil, 2, &msgpb.MsgPosition{ MsgID: id, }) - m.flushDelData(nil, 3, &msgpb.MsgPosition{ + assert.NoError(t, err) + + err = m.flushDelData(nil, 3, &msgpb.MsgPosition{ MsgID: id2, }) + assert.NoError(t, err) + <-ti.Injected() ti.injectDone(true) - finish.Wait() - assert.EqualValues(t, size+2, counter.Load()) + assert.Eventually(t, func() bool { return counter.Load() == int64(size+2) }, 3*time.Second, 100*time.Millisecond) assert.EqualValues(t, 4, packs[size].segmentID) - finish.Add(1) rand.Read(id) - m.flushBufferData(nil, 2, false, false, &msgpb.MsgPosition{ + _, err = m.flushBufferData(nil, 2, false, false, &msgpb.MsgPosition{ MsgID: id, }) + assert.NoError(t, err) + ti = newTaskInjection(1, func(pack *segmentFlushPack) { pack.segmentID = 5 }) @@ -286,8 +323,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { m.flushDelData(nil, 2, &msgpb.MsgPosition{ MsgID: id, }) - finish.Wait() - assert.EqualValues(t, size+3, counter.Load()) + assert.Eventually(t, func() bool { return counter.Load() == int64(size+3) }, 3*time.Second, 100*time.Millisecond) assert.EqualValues(t, 4, packs[size+1].segmentID) } @@ -329,13 +365,18 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) + channel := newTestChannel() + testSeg := &Segment{ + collectionID: 1, + segmentID: 1, + } + testSeg.setType(datapb.SegmentType_New) + channel.segments[testSeg.segmentID] = testSeg + size := 1000 var counter atomic.Int64 - var finish sync.WaitGroup - finish.Add(size) - m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) { + m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, channel, func(pack *segmentFlushPack) { counter.Inc() - finish.Done() }, emptyFlushAndDropFunc) ids := make([][]byte, 0, size) @@ -346,9 +387,10 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { } for i := 0; i < size; i++ { - m.flushDelData(nil, 1, &msgpb.MsgPosition{ + err := m.flushDelData(nil, 1, &msgpb.MsgPosition{ MsgID: ids[i], }) + assert.NoError(t, err) } var finished bool @@ -368,9 +410,10 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { mut.RUnlock() for i := 0; i < size/2; i++ { - m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{ + _, err := m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{ MsgID: ids[i], }) + assert.NoError(t, err) } mut.RLock() @@ -378,9 +421,10 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { mut.RUnlock() for i := size / 2; i < size; i++ { - m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{ + _, err := m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{ MsgID: ids[i], }) + assert.NoError(t, err) } select { @@ -405,7 +449,29 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { var result []*segmentFlushPack signal := make(chan struct{}) - m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) { + channel := newTestChannel() + targets := make(map[int64]struct{}) + //init failed segment + testSeg := &Segment{ + collectionID: 1, + segmentID: -1, + } + testSeg.setType(datapb.SegmentType_New) + channel.segments[testSeg.segmentID] = testSeg + + //init target segment + for i := 1; i < 11; i++ { + targets[int64(i)] = struct{}{} + testSeg := &Segment{ + collectionID: 1, + segmentID: int64(i), + } + testSeg.setType(datapb.SegmentType_New) + channel.segments[testSeg.segmentID] = testSeg + } + + //init flush manager + m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, channel, func(pack *segmentFlushPack) { }, func(packs []*segmentFlushPack) { mut.Lock() result = packs @@ -414,26 +480,28 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { }) halfMsgID := []byte{1, 1, 1} - m.flushBufferData(nil, -1, true, false, &msgpb.MsgPosition{ + _, err := m.flushBufferData(nil, -1, true, false, &msgpb.MsgPosition{ MsgID: halfMsgID, }) + assert.NoError(t, err) m.startDropping() // half normal, half drop mode, should not appear in final packs - m.flushDelData(nil, -1, &msgpb.MsgPosition{ + err = m.flushDelData(nil, -1, &msgpb.MsgPosition{ MsgID: halfMsgID, }) + assert.NoError(t, err) - target := make(map[int64]struct{}) - for i := 1; i < 11; i++ { - target[int64(i)] = struct{}{} - m.flushBufferData(nil, int64(i), true, false, &msgpb.MsgPosition{ - MsgID: []byte{byte(i)}, + for target := range targets { + _, err := m.flushBufferData(nil, target, true, false, &msgpb.MsgPosition{ + MsgID: []byte{byte(target)}, }) - m.flushDelData(nil, int64(i), &msgpb.MsgPosition{ - MsgID: []byte{byte(i)}, + assert.NoError(t, err) + + err = m.flushDelData(nil, target, &msgpb.MsgPosition{ + MsgID: []byte{byte(target)}, }) - t.Log(i) + assert.NoError(t, err) } m.notifyAllFlushed() @@ -446,11 +514,12 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { for _, pack := range result { assert.NotEqual(t, -1, pack.segmentID) output[pack.segmentID] = struct{}{} - _, has := target[pack.segmentID] + _, has := targets[pack.segmentID] assert.True(t, has) } - assert.Equal(t, len(target), len(output)) + assert.Equal(t, len(targets), len(output)) }) + t.Run("test drop mode with injection", func(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) @@ -458,8 +527,26 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { var mut sync.Mutex var result []*segmentFlushPack signal := make(chan struct{}) + channel := newTestChannel() + //init failed segment + testSeg := &Segment{ + collectionID: 1, + segmentID: -1, + } + testSeg.setType(datapb.SegmentType_New) + channel.segments[testSeg.segmentID] = testSeg - m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) { + //init target segment + for i := 1; i < 11; i++ { + seg := &Segment{ + collectionID: 1, + segmentID: int64(i), + } + seg.setType(datapb.SegmentType_New) + channel.segments[seg.segmentID] = seg + } + + m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, channel, func(pack *segmentFlushPack) { }, func(packs []*segmentFlushPack) { mut.Lock() result = packs @@ -467,11 +554,14 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { close(signal) }) + //flush failed segment before start drop mode halfMsgID := []byte{1, 1, 1} - m.flushBufferData(nil, -1, true, false, &msgpb.MsgPosition{ + _, err := m.flushBufferData(nil, -1, true, false, &msgpb.MsgPosition{ MsgID: halfMsgID, }) + assert.NoError(t, err) + //inject target segment injFunc := func(pack *segmentFlushPack) { pack.segmentID = 100 } @@ -484,22 +574,24 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { m.startDropping() // half normal, half drop mode, should not appear in final packs - m.flushDelData(nil, -1, &msgpb.MsgPosition{ + err = m.flushDelData(nil, -1, &msgpb.MsgPosition{ MsgID: halfMsgID, }) + assert.NoError(t, err) for i := 1; i < 11; i++ { - m.flushBufferData(nil, int64(i), true, false, &msgpb.MsgPosition{ + _, err = m.flushBufferData(nil, int64(i), true, false, &msgpb.MsgPosition{ MsgID: []byte{byte(i)}, }) - m.flushDelData(nil, int64(i), &msgpb.MsgPosition{ + assert.NoError(t, err) + + err = m.flushDelData(nil, int64(i), &msgpb.MsgPosition{ MsgID: []byte{byte(i)}, }) + assert.NoError(t, err) } m.notifyAllFlushed() - - <-signal mut.Lock() defer mut.Unlock() @@ -508,7 +600,6 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { assert.Equal(t, int64(100), pack.segmentID) } }) - } func TestRendezvousFlushManager_close(t *testing.T) { @@ -519,11 +610,19 @@ func TestRendezvousFlushManager_close(t *testing.T) { size := 1000 var counter atomic.Int64 - finish := sync.WaitGroup{} - finish.Add(size) - m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) { + + channel := newTestChannel() + + //init test segment + testSeg := &Segment{ + collectionID: 1, + segmentID: 1, + } + testSeg.setType(datapb.SegmentType_New) + channel.segments[testSeg.segmentID] = testSeg + + m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, channel, func(pack *segmentFlushPack) { counter.Inc() - finish.Done() }, emptyFlushAndDropFunc) ids := make([][]byte, 0, size) @@ -533,22 +632,20 @@ func TestRendezvousFlushManager_close(t *testing.T) { ids = append(ids, id) } - wg := sync.WaitGroup{} - wg.Add(size) for i := 0; i < size; i++ { - m.flushDelData(nil, 1, &msgpb.MsgPosition{ + err := m.flushDelData(nil, 1, &msgpb.MsgPosition{ MsgID: ids[i], }) - m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{ - MsgID: ids[i], - }) - wg.Done() - } - wg.Wait() - finish.Wait() - m.close() + assert.NoError(t, err) - assert.EqualValues(t, size, counter.Load()) + _, err = m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{ + MsgID: ids[i], + }) + assert.NoError(t, err) + } + + m.close() + assert.Eventually(t, func() bool { return counter.Load() == int64(size) }, 3*time.Second, 100*time.Millisecond) } func TestFlushNotifyFunc(t *testing.T) { @@ -597,7 +694,6 @@ func TestFlushNotifyFunc(t *testing.T) { notifyFunc(&segmentFlushPack{}) }) }) - t.Run("normal segment not found", func(t *testing.T) { dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_SegmentNotFound assert.Panics(t, func() { diff --git a/internal/datanode/meta_util.go b/internal/datanode/meta_util.go index 55dce72eb0..1c0fcc2702 100644 --- a/internal/datanode/meta_util.go +++ b/internal/datanode/meta_util.go @@ -17,9 +17,9 @@ package datanode import ( + "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" - "github.com/milvus-io/milvus/pkg/common" ) // reviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2 @@ -66,13 +66,11 @@ func reviseVChannelInfo(vChannel *datapb.VchannelInfo) { vChannel.DroppedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetDroppedSegmentIds()) } -// getPKID returns the primary key field id from collection meta. -func getPKID(meta *etcdpb.CollectionMeta) UniqueID { - for _, field := range meta.GetSchema().GetFields() { +func getPKField(meta *etcdpb.CollectionMeta) *schemapb.FieldSchema { + for _, field := range meta.Schema.Fields { if field.GetIsPrimaryKey() { - return field.GetFieldID() + return field } } - - return common.InvalidFieldID + return nil } diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index dcc43f2afd..c9a87400a6 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/internal/storage" s "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" @@ -1120,6 +1121,17 @@ func genInsertDataWithPKs(PKs [2]primaryKey, dataType schemapb.DataType) *Insert return iD } +func genTestStat(meta *etcdpb.CollectionMeta) *storage.PrimaryKeyStats { + var pkFieldID, pkFieldType int64 + for _, field := range meta.Schema.Fields { + if field.IsPrimaryKey { + pkFieldID = field.FieldID + pkFieldType = int64(field.DataType) + } + } + return storage.NewPrimaryKeyStats(pkFieldID, pkFieldType, 0) +} + func genInsertData() *InsertData { return &InsertData{ Data: map[int64]s.FieldData{ diff --git a/internal/datanode/segment.go b/internal/datanode/segment.go index c48c15eede..99ce3b1b70 100644 --- a/internal/datanode/segment.go +++ b/internal/datanode/segment.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/msgpb" + "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" @@ -93,6 +94,30 @@ func (s *Segment) updatePKRange(ids storage.FieldData) { } } +func (s *Segment) getHistoricalStats(pkField *schemapb.FieldSchema) ([]*storage.PrimaryKeyStats, int64) { + statsList := []*storage.PrimaryKeyStats{} + for _, stats := range s.historyStats { + statsList = append(statsList, &storage.PrimaryKeyStats{ + FieldID: pkField.FieldID, + PkType: int64(pkField.DataType), + BF: stats.PkFilter, + MaxPk: stats.MaxPK, + MinPk: stats.MinPK, + }) + } + + if s.currentStat != nil { + statsList = append(statsList, &storage.PrimaryKeyStats{ + FieldID: pkField.FieldID, + PkType: int64(pkField.DataType), + BF: s.currentStat.PkFilter, + MaxPk: s.currentStat.MaxPK, + MinPk: s.currentStat.MinPK, + }) + } + return statsList, s.numRows +} + func (s *Segment) InitCurrentStat() { if s.currentStat == nil { s.currentStat = &storage.PkStatistics{ diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 36fa6720a4..4e81222875 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -840,7 +840,9 @@ func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp, ID: colID, Schema: schema, } - binLogs, statsBinLogs, err := storage.NewInsertCodecWithSchema(meta).Serialize(partID, segmentID, data.buffer) + iCodec := storage.NewInsertCodecWithSchema(meta) + + binLogs, err := iCodec.Serialize(partID, segmentID, data.buffer) if err != nil { return nil, nil, err } @@ -878,27 +880,30 @@ func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp, field2Stats := make(map[UniqueID]*datapb.Binlog) // write stats binlog - for _, blob := range statsBinLogs { - fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) - if err != nil { - log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) - return nil, nil, err - } + statsBinLog, err := iCodec.SerializePkStatsByData(data.buffer) + if err != nil { + return nil, nil, err + } - logidx := field2Logidx[fieldID] + fieldID, err := strconv.ParseInt(statsBinLog.GetKey(), 10, 64) + if err != nil { + log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) + return nil, nil, err + } - // no error raise if alloc=false - k := metautil.JoinIDPath(colID, partID, segmentID, fieldID, logidx) + logidx := field2Logidx[fieldID] - key := path.Join(node.chunkManager.RootPath(), common.SegmentStatslogPath, k) - kvs[key] = blob.Value - field2Stats[fieldID] = &datapb.Binlog{ - EntriesNum: data.size, - TimestampFrom: ts, - TimestampTo: ts, - LogPath: key, - LogSize: int64(len(blob.Value)), - } + // no error raise if alloc=false + k := metautil.JoinIDPath(colID, partID, segmentID, fieldID, logidx) + + key := path.Join(node.chunkManager.RootPath(), common.SegmentStatslogPath, k) + kvs[key] = statsBinLog.Value + field2Stats[fieldID] = &datapb.Binlog{ + EntriesNum: data.size, + TimestampFrom: ts, + TimestampTo: ts, + LogPath: key, + LogSize: int64(len(statsBinLog.Value)), } err = node.chunkManager.MultiWrite(ctx, kvs) diff --git a/internal/datanode/util.go b/internal/datanode/util.go index 072497591f..2d9f7c3f42 100644 --- a/internal/datanode/util.go +++ b/internal/datanode/util.go @@ -56,3 +56,10 @@ func startTracer(msg msgstream.TsMsg, name string) (context.Context, trace.Span) } return otel.Tracer(typeutil.DataNodeRole).Start(ctx, name) } + +func boolToInt(value bool) int { + if value { + return 1 + } + return 0 +} diff --git a/internal/indexnode/chunkmgr_mock.go b/internal/indexnode/chunkmgr_mock.go index e4e93c2189..8d2d130005 100644 --- a/internal/indexnode/chunkmgr_mock.go +++ b/internal/indexnode/chunkmgr_mock.go @@ -202,7 +202,7 @@ func (c *mockChunkmgr) mockFieldData(numrows, dim int, collectionID, partitionID insertCodec := &storage.InsertCodec{ Schema: collMeta, } - blobs, _, err := insertCodec.Serialize(partitionID, segmentID, insertData) + blobs, err := insertCodec.Serialize(partitionID, segmentID, insertData) if err != nil { panic(err) } diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 1b1458ea21..3b94d15984 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -826,6 +826,16 @@ func checkBinlogs(binlogType storage.BinlogType, segmentID typeutil.UniqueID, lo } } +func hasSepcialStatslog(logs *datapb.FieldBinlog) bool { + for _, statslog := range logs.Binlogs { + _, logidx := path.Split(statslog.LogPath) + if logidx == storage.CompoundStatsType.LogIdx() { + return true + } + } + return false +} + func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.UniqueID, binlogs, deltalogs, statslogs []*datapb.FieldBinlog) (map[string]string, error) { @@ -833,8 +843,9 @@ func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.Uniqu checkBinlogs(storage.DeleteBinlog, segmentID, deltalogs) checkBinlogs(storage.StatsBinlog, segmentID, statslogs) // check stats log and bin log size match - if len(binlogs) != 0 && len(statslogs) != 0 { - if len(statslogs[0].GetBinlogs()) != len(binlogs[0].GetBinlogs()) { + // num of stats log may one more than num of binlogs if segment flushed and merged stats log + if len(binlogs) != 0 && len(statslogs) != 0 && !hasSepcialStatslog(statslogs[0]) { + if len(binlogs[0].GetBinlogs()) != len(statslogs[0].GetBinlogs()) { log.Warn("find invalid segment while bin log size didn't match stat log size", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), @@ -843,7 +854,7 @@ func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.Uniqu zap.Any("stats", statslogs), zap.Any("delta", deltalogs), ) - return nil, fmt.Errorf("Segment can not be saved because of binlog "+ + return nil, fmt.Errorf("segment can not be saved because of binlog "+ "file not match stat log number: collection %v, segment %v", collectionID, segmentID) } } diff --git a/internal/querynodev2/segments/mock_data.go b/internal/querynodev2/segments/mock_data.go index 4046aeed28..c74e4e439c 100644 --- a/internal/querynodev2/segments/mock_data.go +++ b/internal/querynodev2/segments/mock_data.go @@ -681,9 +681,18 @@ func genStorageBlob(collectionID int64, if err != nil { return nil, nil, err } - binLogs, statsLogs, err := inCodec.Serialize(partitionID, segmentID, insertData) - return binLogs, statsLogs, err + binLogs, err := inCodec.Serialize(partitionID, segmentID, insertData) + if err != nil { + return nil, nil, err + } + + statsLog, err := inCodec.SerializePkStatsByData(insertData) + if err != nil { + return nil, nil, err + } + + return binLogs, []*storage.Blob{statsLog}, nil } func genCollectionMeta(collectionID int64, partitionID int64, schema *schemapb.CollectionSchema) *etcdpb.CollectionMeta { diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 817f75bc68..b8ed720fc7 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -359,8 +359,8 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI bfs := pkoracle.NewBloomFilterSet(segmentID, partitionID, commonpb.SegmentState_Sealed) log.Info("loading bloom filter for remote...") - pkStatsBinlogs := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID()) - err := loader.loadBloomFilter(ctx, segmentID, bfs, pkStatsBinlogs) + pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID()) + err := loader.loadBloomFilter(ctx, segmentID, bfs, pkStatsBinlogs, logType) if err != nil { log.Warn("load remote segment bloom filter failed", zap.Int64("partitionID", partitionID), @@ -459,8 +459,8 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, // load statslog if it's growing segment if segment.typ == SegmentTypeGrowing { log.Info("loading statslog...") - pkStatsBinlogs := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID()) - err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, pkStatsBinlogs) + pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID()) + err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, pkStatsBinlogs, logType) if err != nil { return err } @@ -470,16 +470,24 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, return loader.LoadDeltaLogs(ctx, segment, loadInfo.Deltalogs) } -func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) []string { +func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) ([]string, storage.StatsLogType) { result := make([]string, 0) for _, fieldBinlog := range fieldBinlogs { if fieldBinlog.FieldID == pkFieldID { for _, binlog := range fieldBinlog.GetBinlogs() { - result = append(result, binlog.GetLogPath()) + _, logidx := path.Split(binlog.GetLogPath()) + // if special status log exist + // only load one file + switch logidx { + case storage.CompoundStatsType.LogIdx(): + return []string{binlog.GetLogPath()}, storage.CompoundStatsType + default: + result = append(result, binlog.GetLogPath()) + } } } } - return result + return result, storage.DefaultStatsType } func (loader *segmentLoader) loadGrowingSegmentFields(ctx context.Context, segment *LocalSegment, fieldBinlogs []*datapb.FieldBinlog) error { @@ -784,7 +792,9 @@ func (loader *segmentLoader) loadSealedSegments(segment *LocalSegment, insertDat return nil } -func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet, binlogPaths []string) error { +func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet, + binlogPaths []string, logType storage.StatsLogType) error { + log := log.Ctx(ctx).With( zap.Int64("segmentID", segmentID), ) @@ -798,16 +808,26 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6 if err != nil { return err } - blobs := make([]*storage.Blob, 0) + blobs := []*storage.Blob{} for i := 0; i < len(values); i++ { blobs = append(blobs, &storage.Blob{Value: values[i]}) } - stats, err := storage.DeserializeStats(blobs) - if err != nil { - log.Warn("failed to deserialize stats", zap.Error(err)) - return err + var stats []*storage.PrimaryKeyStats + if logType == storage.CompoundStatsType { + stats, err = storage.DeserializeStatsList(blobs[0]) + if err != nil { + log.Warn("failed to deserialize stats list", zap.Error(err)) + return err + } + } else { + stats, err = storage.DeserializeStats(blobs) + if err != nil { + log.Warn("failed to deserialize stats", zap.Error(err)) + return err + } } + var size uint for _, stat := range stats { pkStat := &storage.PkStatistics{ diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go index 96b5024928..9252344f6e 100644 --- a/internal/storage/binlog_iterator_test.go +++ b/internal/storage/binlog_iterator_test.go @@ -72,7 +72,7 @@ func generateTestData(t *testing.T, num int) []*Blob { }, }} - blobs, _, err := insertCodec.Serialize(1, 1, data) + blobs, err := insertCodec.Serialize(1, 1, data) assert.Nil(t, err) return blobs } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index ae94a95ef4..743571fcba 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -287,6 +287,15 @@ type InsertData struct { Infos []BlobInfo } +func (iData *InsertData) IsEmpty() bool { + if iData == nil { + return true + } + + timeFieldData, ok := iData.Data[common.TimeStampField] + return (!ok) || (timeFieldData.RowNum() <= 0) +} + // InsertCodec serializes and deserializes the insert data // Blob key example: // ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx} @@ -304,20 +313,95 @@ func NewInsertCodecWithSchema(schema *etcdpb.CollectionMeta) *InsertCodec { return &InsertCodec{Schema: schema} } +// Serialize Pk stats log +func (insertCodec *InsertCodec) SerializePkStats(stats *PrimaryKeyStats, rowNum int64) (*Blob, error) { + if stats == nil || stats.BF == nil { + return nil, fmt.Errorf("sericalize empty pk stats") + } + + //Serialize by pk stats + blobKey := fmt.Sprintf("%d", stats.FieldID) + statsWriter := &StatsWriter{} + err := statsWriter.Generate(stats) + if err != nil { + return nil, err + } + + buffer := statsWriter.GetBuffer() + return &Blob{ + Key: blobKey, + Value: buffer, + RowNum: rowNum, + }, nil +} + +// Serialize Pk stats list to one blob +func (insertCodec *InsertCodec) SerializePkStatsList(stats []*PrimaryKeyStats, rowNum int64) (*Blob, error) { + if len(stats) == 0 { + return nil, nil + } + + blobKey := fmt.Sprintf("%d", stats[0].FieldID) + statsWriter := &StatsWriter{} + err := statsWriter.GenerateList(stats) + if err != nil { + return nil, err + } + + buffer := statsWriter.GetBuffer() + return &Blob{ + Key: blobKey, + Value: buffer, + RowNum: rowNum, + }, nil +} + +// Serialize Pk stats log by insert data +func (insertCodec *InsertCodec) SerializePkStatsByData(data *InsertData) (*Blob, error) { + timeFieldData, ok := data.Data[common.TimeStampField] + if !ok { + return nil, fmt.Errorf("data doesn't contains timestamp field") + } + if timeFieldData.RowNum() <= 0 { + return nil, fmt.Errorf("there's no data in InsertData") + } + rowNum := int64(timeFieldData.RowNum()) + + for _, field := range insertCodec.Schema.Schema.Fields { + // stats fields + if !field.GetIsPrimaryKey() { + continue + } + singleData := data.Data[field.FieldID] + blobKey := fmt.Sprintf("%d", field.FieldID) + statsWriter := &StatsWriter{} + err := statsWriter.GenerateByData(field.FieldID, field.DataType, singleData) + if err != nil { + return nil, err + } + buffer := statsWriter.GetBuffer() + return &Blob{ + Key: blobKey, + Value: buffer, + RowNum: rowNum, + }, nil + } + return nil, fmt.Errorf("there is no pk field") +} + // Serialize transfer insert data to blob. It will sort insert data by timestamp. // From schema, it gets all fields. // For each field, it will create a binlog writer, and write an event to the binlog. // It returns binlog buffer in the end. -func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, []*Blob, error) { +func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) { blobs := make([]*Blob, 0) - statsBlobs := make([]*Blob, 0) var writer *InsertBinlogWriter timeFieldData, ok := data.Data[common.TimeStampField] if !ok { - return nil, nil, fmt.Errorf("data doesn't contains timestamp field") + return nil, fmt.Errorf("data doesn't contains timestamp field") } if timeFieldData.RowNum() <= 0 { - return nil, nil, fmt.Errorf("there's no data in InsertData") + return nil, fmt.Errorf("there's no data in InsertData") } rowNum := int64(timeFieldData.RowNum()) @@ -346,14 +430,14 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique case schemapb.DataType_BinaryVector: eventWriter, err = writer.NextInsertEventWriter(singleData.(*BinaryVectorFieldData).Dim) default: - return nil, nil, fmt.Errorf("undefined data type %d", field.DataType) + return nil, fmt.Errorf("undefined data type %d", field.DataType) } } else { eventWriter, err = writer.NextInsertEventWriter() } if err != nil { writer.Close() - return nil, nil, err + return nil, err } eventWriter.SetEventTimestamp(typeutil.Timestamp(startTs), typeutil.Timestamp(endTs)) @@ -363,7 +447,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BoolFieldData).GetMemorySize())) case schemapb.DataType_Int8: @@ -371,7 +455,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int8FieldData).GetMemorySize())) case schemapb.DataType_Int16: @@ -379,7 +463,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int16FieldData).GetMemorySize())) case schemapb.DataType_Int32: @@ -387,7 +471,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int32FieldData).GetMemorySize())) case schemapb.DataType_Int64: @@ -395,7 +479,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int64FieldData).GetMemorySize())) case schemapb.DataType_Float: @@ -403,7 +487,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatFieldData).GetMemorySize())) case schemapb.DataType_Double: @@ -411,7 +495,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*DoubleFieldData).GetMemorySize())) case schemapb.DataType_String, schemapb.DataType_VarChar: @@ -420,7 +504,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*StringFieldData).GetMemorySize())) @@ -430,7 +514,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*ArrayFieldData).GetMemorySize())) @@ -440,7 +524,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*JSONFieldData).GetMemorySize())) @@ -449,7 +533,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BinaryVectorFieldData).GetMemorySize())) case schemapb.DataType_FloatVector: @@ -457,14 +541,14 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatVectorFieldData).GetMemorySize())) default: - return nil, nil, fmt.Errorf("undefined data type %d", field.DataType) + return nil, fmt.Errorf("undefined data type %d", field.DataType) } if err != nil { - return nil, nil, err + return nil, err } writer.SetEventTimeStamp(typeutil.Timestamp(startTs), typeutil.Timestamp(endTs)) @@ -472,14 +556,14 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } buffer, err := writer.GetBuffer() if err != nil { eventWriter.Close() writer.Close() - return nil, nil, err + return nil, err } blobKey := fmt.Sprintf("%d", field.FieldID) blobs = append(blobs, &Blob{ @@ -489,24 +573,9 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique }) eventWriter.Close() writer.Close() - - // stats fields - if field.GetIsPrimaryKey() { - statsWriter := &StatsWriter{} - err = statsWriter.GeneratePrimaryKeyStats(field.FieldID, field.DataType, singleData) - if err != nil { - return nil, nil, err - } - statsBuffer := statsWriter.GetBuffer() - statsBlobs = append(statsBlobs, &Blob{ - Key: blobKey, - Value: statsBuffer, - RowNum: rowNum, - }) - } } - return blobs, statsBlobs, nil + return blobs, nil } func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) ( diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index d8b3996956..689427afce 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -105,7 +105,7 @@ func TestInsertCodec(t *testing.T) { { FieldID: Int64Field, Name: "field_int64", - IsPrimaryKey: false, + IsPrimaryKey: true, Description: "int64", DataType: schemapb.DataType_Int64, }, @@ -307,18 +307,21 @@ func TestInsertCodec(t *testing.T) { JSONField: &JSONFieldData{[][]byte{}}, }, } - b, s, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty) + b, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty) assert.Error(t, err) assert.Empty(t, b) + + s, err := insertCodec.SerializePkStatsByData(insertDataEmpty) + assert.Error(t, err) assert.Empty(t, s) - Blobs1, statsBlob1, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1) + Blobs1, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1) assert.Nil(t, err) for _, blob := range Blobs1 { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 100) assert.Equal(t, blob.GetKey(), blob.Key) } - Blobs2, statsBlob2, err := insertCodec.Serialize(PartitionID, SegmentID, insertData2) + Blobs2, err := insertCodec.Serialize(PartitionID, SegmentID, insertData2) assert.Nil(t, err) for _, blob := range Blobs2 { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99) @@ -368,10 +371,14 @@ func TestInsertCodec(t *testing.T) { _, _, _, _, err = insertCodec.DeserializeAll(blobs) assert.NotNil(t, err) - _, err = DeserializeStats(statsBlob1) + statsBlob1, err := insertCodec.SerializePkStatsByData(insertData1) + assert.Nil(t, err) + _, err = DeserializeStats([]*Blob{statsBlob1}) assert.Nil(t, err) - _, err = DeserializeStats(statsBlob2) + statsBlob2, err := insertCodec.SerializePkStatsByData(insertData2) + assert.Nil(t, err) + _, err = DeserializeStats([]*Blob{statsBlob2}) assert.Nil(t, err) } @@ -495,7 +502,7 @@ func TestDDCodec(t *testing.T) { func TestTsError(t *testing.T) { insertData := &InsertData{} insertCodec := NewInsertCodecWithSchema(nil) - blobs, _, err := insertCodec.Serialize(1, 1, insertData) + blobs, err := insertCodec.Serialize(1, 1, insertData) assert.Nil(t, blobs) assert.NotNil(t, err) } diff --git a/internal/storage/print_binlog_test.go b/internal/storage/print_binlog_test.go index 3087da1317..a45662ca38 100644 --- a/internal/storage/print_binlog_test.go +++ b/internal/storage/print_binlog_test.go @@ -268,7 +268,7 @@ func TestPrintBinlogFiles(t *testing.T) { }, }, } - firstBlobs, _, err := insertCodec.Serialize(1, 1, insertDataFirst) + firstBlobs, err := insertCodec.Serialize(1, 1, insertDataFirst) assert.Nil(t, err) var binlogFiles []string for index, blob := range firstBlobs { @@ -283,7 +283,7 @@ func TestPrintBinlogFiles(t *testing.T) { err = fd.Close() assert.Nil(t, err) } - secondBlobs, _, err := insertCodec.Serialize(1, 1, insertDataSecond) + secondBlobs, err := insertCodec.Serialize(1, 1, insertDataSecond) assert.Nil(t, err) for index, blob := range secondBlobs { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99) diff --git a/internal/storage/stats.go b/internal/storage/stats.go index d9b86c895b..512594d48d 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -18,10 +18,12 @@ package storage import ( "encoding/json" + "fmt" "github.com/bits-and-blooms/bloom/v3" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" ) const ( @@ -93,6 +95,8 @@ func (stats *PrimaryKeyStats) UnmarshalJSON(data []byte) error { case schemapb.DataType_VarChar: stats.MaxPk = &VarCharPrimaryKey{} stats.MinPk = &VarCharPrimaryKey{} + default: + return fmt.Errorf("Invalid PK Data Type") } if maxPkMessage, ok := messageMap["maxPk"]; ok && maxPkMessage != nil { @@ -120,8 +124,58 @@ func (stats *PrimaryKeyStats) UnmarshalJSON(data []byte) error { return nil } +func (stats *PrimaryKeyStats) UpdateByMsgs(msgs FieldData) { + switch schemapb.DataType(stats.PkType) { + case schemapb.DataType_Int64: + data := msgs.(*Int64FieldData).Data + if len(data) < 1 { + // return error: msgs must has one element at least + return + } + + b := make([]byte, 8) + for _, int64Value := range data { + pk := NewInt64PrimaryKey(int64Value) + stats.UpdateMinMax(pk) + common.Endian.PutUint64(b, uint64(int64Value)) + stats.BF.Add(b) + } + case schemapb.DataType_VarChar: + data := msgs.(*StringFieldData).Data + if len(data) < 1 { + // return error: msgs must has one element at least + return + } + + for _, str := range data { + pk := NewVarCharPrimaryKey(str) + stats.UpdateMinMax(pk) + stats.BF.AddString(str) + } + default: + //TODO:: + } +} + +func (stats *PrimaryKeyStats) Update(pk PrimaryKey) { + stats.UpdateMinMax(pk) + switch schemapb.DataType(stats.PkType) { + case schemapb.DataType_Int64: + data := pk.GetValue().(int64) + b := make([]byte, 8) + common.Endian.PutUint64(b, uint64(data)) + stats.BF.Add(b) + case schemapb.DataType_VarChar: + data := pk.GetValue().(string) + stats.BF.AddString(data) + default: + log.Warn("Update pk stats with invalid data type") + } + +} + // updatePk update minPk and maxPk value -func (stats *PrimaryKeyStats) updatePk(pk PrimaryKey) { +func (stats *PrimaryKeyStats) UpdateMinMax(pk PrimaryKey) { if stats.MinPk == nil { stats.MinPk = pk } else if stats.MinPk.GT(pk) { @@ -135,6 +189,14 @@ func (stats *PrimaryKeyStats) updatePk(pk PrimaryKey) { } } +func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) *PrimaryKeyStats { + return &PrimaryKeyStats{ + FieldID: fieldID, + PkType: pkType, + BF: bloom.NewWithEstimates(uint(rowNum), MaxBloomFalsePositive), + } +} + // StatsWriter writes stats to buffer type StatsWriter struct { buffer []byte @@ -145,54 +207,38 @@ func (sw *StatsWriter) GetBuffer() []byte { return sw.buffer } -// GeneratePrimaryKeyStats writes Int64Stats from @msgs with @fieldID to @buffer -func (sw *StatsWriter) GeneratePrimaryKeyStats(fieldID int64, pkType schemapb.DataType, msgs FieldData) error { - stats := &PrimaryKeyStats{ - FieldID: fieldID, - PkType: int64(pkType), - } - - stats.BF = bloom.NewWithEstimates(uint(msgs.RowNum()), MaxBloomFalsePositive) - switch pkType { - case schemapb.DataType_Int64: - data := msgs.(*Int64FieldData).Data - if len(data) < 1 { - // return error: msgs must has one element at least - return nil - } - - b := make([]byte, 8) - for _, int64Value := range data { - pk := NewInt64PrimaryKey(int64Value) - stats.updatePk(pk) - common.Endian.PutUint64(b, uint64(int64Value)) - stats.BF.Add(b) - } - case schemapb.DataType_VarChar: - data := msgs.(*StringFieldData).Data - if len(data) < 1 { - // return error: msgs must has one element at least - return nil - } - - for _, str := range data { - pk := NewVarCharPrimaryKey(str) - stats.updatePk(pk) - stats.BF.AddString(str) - } - default: - //TODO:: - } - +// GenerateList writes Stats slice to buffer +func (sw *StatsWriter) GenerateList(stats []*PrimaryKeyStats) error { b, err := json.Marshal(stats) if err != nil { return err } sw.buffer = b - return nil } +// Generate writes Stats to buffer +func (sw *StatsWriter) Generate(stats *PrimaryKeyStats) error { + b, err := json.Marshal(stats) + if err != nil { + return err + } + sw.buffer = b + return nil +} + +// GenerateByData writes Int64Stats or StringStats from @msgs with @fieldID to @buffer +func (sw *StatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs FieldData) error { + stats := &PrimaryKeyStats{ + FieldID: fieldID, + PkType: int64(pkType), + BF: bloom.NewWithEstimates(uint(msgs.RowNum()), MaxBloomFalsePositive), + } + + stats.UpdateByMsgs(msgs) + return sw.Generate(stats) +} + // StatsReader reads stats type StatsReader struct { buffer []byte @@ -214,6 +260,17 @@ func (sr *StatsReader) GetPrimaryKeyStats() (*PrimaryKeyStats, error) { return stats, nil } +// GetInt64Stats returns buffer as PrimaryKeyStats +func (sr *StatsReader) GetPrimaryKeyStatsList() ([]*PrimaryKeyStats, error) { + stats := []*PrimaryKeyStats{} + err := json.Unmarshal(sr.buffer, &stats) + if err != nil { + return nil, err + } + + return stats, nil +} + // DeserializeStats deserialize @blobs as []*PrimaryKeyStats func DeserializeStats(blobs []*Blob) ([]*PrimaryKeyStats, error) { results := make([]*PrimaryKeyStats, 0, len(blobs)) @@ -231,3 +288,16 @@ func DeserializeStats(blobs []*Blob) ([]*PrimaryKeyStats, error) { } return results, nil } + +func DeserializeStatsList(blob *Blob) ([]*PrimaryKeyStats, error) { + if blob.Value == nil { + return []*PrimaryKeyStats{}, nil + } + sr := &StatsReader{} + sr.SetBuffer(blob.Value) + stats, err := sr.GetPrimaryKeyStatsList() + if err != nil { + return nil, err + } + return stats, nil +} diff --git a/internal/storage/stats_test.go b/internal/storage/stats_test.go index 78e608937c..bdde9af5c7 100644 --- a/internal/storage/stats_test.go +++ b/internal/storage/stats_test.go @@ -32,7 +32,7 @@ func TestStatsWriter_Int64PrimaryKey(t *testing.T) { Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}, } sw := &StatsWriter{} - err := sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, data) + err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data) assert.NoError(t, err) b := sw.GetBuffer() @@ -57,7 +57,7 @@ func TestStatsWriter_Int64PrimaryKey(t *testing.T) { msgs := &Int64FieldData{ Data: []int64{}, } - err = sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, msgs) + err = sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, msgs) assert.Nil(t, err) } @@ -71,7 +71,7 @@ func TestStatsWriter_BF(t *testing.T) { } t.Log(data.RowNum()) sw := &StatsWriter{} - err := sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, data) + err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data) assert.NoError(t, err) stats := &PrimaryKeyStats{} @@ -95,7 +95,7 @@ func TestStatsWriter_VarCharPrimaryKey(t *testing.T) { Data: []string{"bc", "ac", "abd", "cd", "milvus"}, } sw := &StatsWriter{} - err := sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_VarChar, data) + err := sw.GenerateByData(common.RowIDField, schemapb.DataType_VarChar, data) assert.NoError(t, err) b := sw.GetBuffer() @@ -114,7 +114,7 @@ func TestStatsWriter_VarCharPrimaryKey(t *testing.T) { msgs := &Int64FieldData{ Data: []int64{}, } - err = sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, msgs) + err = sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, msgs) assert.Nil(t, err) } diff --git a/internal/storage/types.go b/internal/storage/types.go index af334cc1fa..aca9a119dc 100644 --- a/internal/storage/types.go +++ b/internal/storage/types.go @@ -13,12 +13,27 @@ package storage import ( "context" + "fmt" "io" "time" "golang.org/x/exp/mmap" ) +type StatsLogType int64 + +const ( + DefaultStatsType StatsLogType = iota + 0 + + // CompundStatsType log save multiple stats + // and bloom filters to one file + CompoundStatsType +) + +func (s StatsLogType) LogIdx() string { + return fmt.Sprintf("0%d", s) +} + type FileReader interface { io.Reader io.Closer diff --git a/internal/storage/vector_chunk_manager_test.go b/internal/storage/vector_chunk_manager_test.go index 1ccccd24ca..34d6aacb4e 100644 --- a/internal/storage/vector_chunk_manager_test.go +++ b/internal/storage/vector_chunk_manager_test.go @@ -108,7 +108,7 @@ func initBinlogFile(schema *etcdpb.CollectionMeta) []*Blob { }, } - blobs, _, err := insertCodec.Serialize(1, 1, insertData) + blobs, err := insertCodec.Serialize(1, 1, insertData) if err != nil { return nil }