merge stats log when segment flushing or compacting (#23570)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2023-05-29 10:21:28 +08:00 committed by GitHub
parent 1deac47069
commit c84bdcea49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1251 additions and 730 deletions

View File

@ -54,8 +54,8 @@ type uploader interface {
//
// errUploadToBlobStorage is returned if ctx is canceled from outside while a uploading is inprogress.
// Beware of the ctx here, if no timeout or cancel is applied to this ctx, this uploading may retry forever.
upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error)
uploadInsertLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error)
uploadInsertLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, error)
uploadStatsLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, stats *storage.PrimaryKeyStats, totRows int64, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error)
uploadDeltaLog(ctx context.Context, segID, partID UniqueID, dData *DeleteData, meta *etcdpb.CollectionMeta) ([]*datapb.FieldBinlog, error)
}
@ -131,108 +131,6 @@ func (b *binlogIO) uploadSegmentFiles(
return nil
}
type segPaths struct {
inPaths []*datapb.FieldBinlog
statsPaths []*datapb.FieldBinlog
deltaInfo []*datapb.FieldBinlog
}
func (b *binlogIO) upload(
ctx context.Context,
segID, partID UniqueID,
iDatas []*InsertData,
dData *DeleteData,
meta *etcdpb.CollectionMeta) (*segPaths, error) {
var (
p = &segPaths{} // The returns
kvs = make(map[string][]byte) // Key values to store in minIO
insertField2Path = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its FieldBinlog
statsField2Path = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its statsBinlog
)
for _, iData := range iDatas {
tf, ok := iData.Data[common.TimeStampField]
if !ok || tf.RowNum() == 0 {
log.Warn("binlog io uploading empty insert data",
zap.Int64("segmentID", segID),
zap.Int64("collectionID", meta.GetID()),
)
continue
}
blobs, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta)
if err != nil {
log.Warn("generate insert blobs wrong",
zap.Int64("collectionID", meta.GetID()),
zap.Int64("segmentID", segID),
zap.Error(err))
return nil, err
}
for k, v := range blobs {
kvs[k] = v
}
for fID, path := range inpaths {
tmpBinlog, ok := insertField2Path[fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
insertField2Path[fID] = tmpBinlog
}
for fID, path := range statspaths {
tmpBinlog, ok := statsField2Path[fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
statsField2Path[fID] = tmpBinlog
}
}
for _, path := range insertField2Path {
p.inPaths = append(p.inPaths, path)
}
for _, path := range statsField2Path {
p.statsPaths = append(p.statsPaths, path)
}
// If there are delta binlogs
if dData.RowCount > 0 {
k, v, err := b.genDeltaBlobs(dData, meta.GetID(), partID, segID)
if err != nil {
log.Warn("generate delta blobs wrong",
zap.Int64("collectionID", meta.GetID()),
zap.Int64("segmentID", segID),
zap.Error(err))
return nil, err
}
kvs[k] = v
p.deltaInfo = append(p.deltaInfo, &datapb.FieldBinlog{
FieldID: 0, // TODO: Not useful on deltalogs, FieldID shall be ID of primary key field
Binlogs: []*datapb.Binlog{{
EntriesNum: dData.RowCount,
LogPath: k,
LogSize: int64(len(v)),
}},
})
}
err := b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs)
if err != nil {
return nil, err
}
return p, nil
}
// genDeltaBlobs returns key, value
func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueID) (string, []byte, error) {
dCodec := storage.NewDeleteCodec()
@ -253,32 +151,26 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI
return key, blob.GetValue(), nil
}
// genInsertBlobs returns kvs, insert-paths, stats-paths
func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string][]byte, map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
inCodec := storage.NewInsertCodecWithSchema(meta)
inlogs, statslogs, err := inCodec.Serialize(partID, segID, data)
// genInsertBlobs returns insert-paths and save blob to kvs
func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte) (map[UniqueID]*datapb.FieldBinlog, error) {
inlogs, err := iCodec.Serialize(partID, segID, data)
if err != nil {
return nil, nil, nil, err
return nil, err
}
var (
kvs = make(map[string][]byte, len(inlogs)+len(statslogs))
inpaths = make(map[UniqueID]*datapb.FieldBinlog)
statspaths = make(map[UniqueID]*datapb.FieldBinlog)
)
inpaths := make(map[UniqueID]*datapb.FieldBinlog)
notifyGenIdx := make(chan struct{})
defer close(notifyGenIdx)
generator, err := b.GetGenerator(len(inlogs)+len(statslogs), notifyGenIdx)
generator, err := b.GetGenerator(len(inlogs), notifyGenIdx)
if err != nil {
return nil, nil, nil, err
return nil, err
}
for _, blob := range inlogs {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := metautil.JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
k := metautil.JoinIDPath(iCodec.Schema.GetID(), partID, segID, fID, <-generator)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentInsertLogPath, k)
value := blob.GetValue()
@ -291,24 +183,76 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
}
}
for _, blob := range statslogs {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
return inpaths, nil
}
k := metautil.JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
// genStatBlobs return stats log paths and save blob to kvs
func (b *binlogIO) genStatBlobs(stats *storage.PrimaryKeyStats, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte, totRows int64) (map[UniqueID]*datapb.FieldBinlog, error) {
statBlob, err := iCodec.SerializePkStats(stats, totRows)
if err != nil {
return nil, err
}
statPaths := make(map[UniqueID]*datapb.FieldBinlog)
value := blob.GetValue()
fileLen := len(value)
idx, err := b.AllocOne()
if err != nil {
return nil, err
}
kvs[key] = value
statspaths[fID] = &datapb.FieldBinlog{
FieldID: fID,
Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: blob.RowNum}},
fID, _ := strconv.ParseInt(statBlob.GetKey(), 10, 64)
k := metautil.JoinIDPath(iCodec.Schema.GetID(), partID, segID, fID, idx)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
value := statBlob.GetValue()
fileLen := len(value)
kvs[key] = value
statPaths[fID] = &datapb.FieldBinlog{
FieldID: fID,
Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: totRows}},
}
return statPaths, nil
}
// update stats log
// also update with insert data if not nil
func (b *binlogIO) uploadStatsLog(
ctx context.Context,
segID UniqueID,
partID UniqueID,
iData *InsertData,
stats *storage.PrimaryKeyStats,
totRows int64,
meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
var inPaths map[int64]*datapb.FieldBinlog
var err error
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
if !iData.IsEmpty() {
inPaths, err = b.genInsertBlobs(iData, partID, segID, iCodec, kvs)
if err != nil {
log.Warn("generate insert blobs wrong",
zap.Int64("collectionID", iCodec.Schema.GetID()),
zap.Int64("segmentID", segID),
zap.Error(err))
return nil, nil, err
}
}
return kvs, inpaths, statspaths, nil
statPaths, err := b.genStatBlobs(stats, partID, segID, iCodec, kvs, totRows)
if err != nil {
return nil, nil, err
}
err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs)
if err != nil {
return nil, nil, err
}
return inPaths, statPaths, nil
}
func (b *binlogIO) uploadInsertLog(
@ -316,55 +260,30 @@ func (b *binlogIO) uploadInsertLog(
segID UniqueID,
partID UniqueID,
iData *InsertData,
meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
var (
insertField2Path = make(map[UniqueID]*datapb.FieldBinlog)
statsField2Path = make(map[UniqueID]*datapb.FieldBinlog)
)
meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, error) {
tf, ok := iData.Data[common.TimeStampField]
if !ok || tf.RowNum() == 0 {
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
if iData.IsEmpty() {
log.Warn("binlog io uploading empty insert data",
zap.Int64("segmentID", segID),
zap.Int64("collectionID", meta.GetID()),
zap.Int64("collectionID", iCodec.Schema.GetID()),
)
return nil, nil, nil
return nil, nil
}
kvs, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta)
inpaths, err := b.genInsertBlobs(iData, partID, segID, iCodec, kvs)
if err != nil {
log.Warn("generate insert blobs wrong",
zap.Int64("collectionID", meta.GetID()),
zap.Int64("segmentID", segID),
zap.Error(err))
return nil, nil, err
}
for fID, path := range inpaths {
tmpBinlog, ok := insertField2Path[fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
insertField2Path[fID] = tmpBinlog
}
for fID, path := range statspaths {
tmpBinlog, ok := statsField2Path[fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
statsField2Path[fID] = tmpBinlog
return nil, err
}
err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs)
if err != nil {
return nil, nil, err
return nil, err
}
return insertField2Path, statsField2Path, nil
return inpaths, nil
}
func (b *binlogIO) uploadDeltaLog(

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -53,169 +52,6 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
t.Run("Test upload", func(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "uploads", schemapb.DataType_Int64)
//pkFieldID := int64(106)
iData := genInsertData()
pk := newInt64PrimaryKey(888)
dData := &DeleteData{
RowCount: 1,
Pks: []primaryKey{pk},
Tss: []uint64{666666},
}
t.Run("Test upload one iData", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
b := &binlogIO{cm, alloc}
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(p.inPaths))
assert.Equal(t, 1, len(p.statsPaths))
assert.Equal(t, 1, len(p.inPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(p.statsPaths[0].GetBinlogs()))
assert.NotNil(t, p.deltaInfo)
})
t.Run("Test upload two iData", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
b := &binlogIO{cm, alloc}
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData, iData}, dData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(p.inPaths))
assert.Equal(t, 1, len(p.statsPaths))
assert.Equal(t, 2, len(p.inPaths[0].GetBinlogs()))
assert.Equal(t, 2, len(p.statsPaths[0].GetBinlogs()))
assert.NotNil(t, p.deltaInfo)
})
t.Run("Test uploadInsertLog", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
b := &binlogIO{cm, alloc}
in, stats, err := b.uploadInsertLog(ctx, 1, 10, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(in))
assert.Equal(t, 1, len(in[0].GetBinlogs()))
assert.Equal(t, 1, len(stats))
})
t.Run("Test uploadDeltaLog", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
b := &binlogIO{cm, alloc}
deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta)
assert.NoError(t, err)
assert.NotNil(t, deltas)
assert.Equal(t, 1, len(deltas[0].GetBinlogs()))
})
t.Run("Test context Done", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
b := &binlogIO{cm, alloc}
p, err := b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta)
assert.EqualError(t, err, errUploadToBlobStorage.Error())
assert.Nil(t, p)
in, _, err := b.uploadInsertLog(ctx, 1, 10, iData, meta)
assert.EqualError(t, err, errUploadToBlobStorage.Error())
assert.Nil(t, in)
deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta)
assert.EqualError(t, err, errUploadToBlobStorage.Error())
assert.Nil(t, deltas)
})
})
t.Run("Test upload error", func(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "uploads", schemapb.DataType_Int64)
dData := &DeleteData{
Pks: []primaryKey{},
Tss: []uint64{},
}
t.Run("Test upload empty insertData", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
b := &binlogIO{cm, alloc}
iData := genEmptyInsertData()
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.NoError(t, err)
assert.Empty(t, p.inPaths)
assert.Empty(t, p.statsPaths)
assert.Empty(t, p.deltaInfo)
iData = &InsertData{Data: make(map[int64]storage.FieldData)}
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.NoError(t, err)
assert.Empty(t, p.inPaths)
assert.Empty(t, p.statsPaths)
assert.Empty(t, p.deltaInfo)
})
t.Run("Test deleta data not match", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
b := &binlogIO{cm, alloc}
iData := genInsertData()
dData := &DeleteData{
Pks: []primaryKey{},
Tss: []uint64{1},
RowCount: 1,
}
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
})
t.Run("Test multisave error", func(t *testing.T) {
mkc := &mockCm{errMultiSave: true}
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
var (
b = &binlogIO{mkc, alloc}
iData = genInsertData()
pk = newInt64PrimaryKey(1)
dData = &DeleteData{
Pks: []primaryKey{pk},
Tss: []uint64{1},
RowCount: 1,
}
)
ctx, cancel := context.WithTimeout(context.TODO(), 20*time.Millisecond)
p, err := b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
in, _, err := b.uploadInsertLog(ctx, 1, 10, iData, meta)
assert.Error(t, err)
assert.Empty(t, in)
deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta)
assert.Error(t, err)
assert.Empty(t, deltas)
cancel()
})
})
t.Run("Test download", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
b := &binlogIO{cm, alloc}
@ -271,6 +107,57 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
assert.Empty(t, blobs)
cancel()
})
t.Run("Test upload stats log err", func(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)
t.Run("gen insert blob failed", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err"))
b := binlogIO{cm, alloc}
_, _, err := b.uploadStatsLog(context.Background(), 1, 10, genInsertData(), genTestStat(meta), 10, meta)
assert.Error(t, err)
})
})
t.Run("Test upload insert log err", func(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)
t.Run("empty insert", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
b := binlogIO{cm, alloc}
paths, err := b.uploadInsertLog(context.Background(), 1, 10, genEmptyInsertData(), meta)
assert.NoError(t, err)
assert.Nil(t, paths)
})
t.Run("gen insert blob failed", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
b := binlogIO{cm, alloc}
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err"))
_, err := b.uploadInsertLog(context.Background(), 1, 10, genInsertData(), meta)
assert.Error(t, err)
})
t.Run("upload failed", func(t *testing.T) {
mkc := &mockCm{errMultiLoad: true, errMultiSave: true}
alloc := allocator.NewMockAllocator(t)
b := binlogIO{mkc, alloc}
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := b.uploadInsertLog(ctx, 1, 10, genInsertData(), meta)
assert.Error(t, err)
})
})
}
func prepareBlob(cm storage.ChunkManager, key string) ([]byte, string, error) {
@ -372,23 +259,18 @@ func TestBinlogIOInnerMethods(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", test.pkType)
helper, err := typeutil.CreateSchemaHelper(meta.Schema)
assert.NoError(t, err)
primaryKeyFieldSchema, err := helper.GetPrimaryKeyField()
assert.NoError(t, err)
primaryKeyFieldID := primaryKeyFieldSchema.GetFieldID()
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs, pin, pstats, err := b.genInsertBlobs(genInsertData(), 10, 1, meta)
kvs := make(map[string][]byte)
pin, err := b.genInsertBlobs(genInsertData(), 10, 1, iCodec, kvs)
assert.NoError(t, err)
assert.Equal(t, 1, len(pstats))
assert.Equal(t, 12, len(pin))
assert.Equal(t, 13, len(kvs))
assert.Equal(t, 12, len(kvs))
log.Debug("test paths",
zap.Any("kvs no.", len(kvs)),
zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath()),
zap.String("stats paths field0", pstats[primaryKeyFieldID].GetBinlogs()[0].GetLogPath()))
zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath()))
})
}
})
@ -400,36 +282,88 @@ func TestBinlogIOInnerMethods(t *testing.T) {
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
t.Run("serialize error", func(t *testing.T) {
iCodec := storage.NewInsertCodecWithSchema(nil)
bin := &binlogIO{cm, allocator.NewMockAllocator(t)}
kvs, pin, pstats, err := bin.genInsertBlobs(genEmptyInsertData(), 10, 1, nil)
kvs := make(map[string][]byte)
pin, err := bin.genInsertBlobs(genEmptyInsertData(), 10, 1, iCodec, kvs)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
})
t.Run("GetGenerator error", func(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)
iCodec := storage.NewInsertCodecWithSchema(meta)
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock GetGenerator error"))
bin := &binlogIO{cm, alloc}
kvs, pin, pstats, err := bin.genInsertBlobs(genInsertData(), 10, 1, meta)
kvs := make(map[string][]byte)
pin, err := bin.genInsertBlobs(genInsertData(), 10, 1, iCodec, kvs)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
})
})
t.Run("Test genStatsBlob", func(t *testing.T) {
f := &MetaFactory{}
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Return(0, nil)
b := binlogIO{cm, alloc}
tests := []struct {
pkType schemapb.DataType
description string
expectError bool
}{
{schemapb.DataType_Int64, "int64PrimaryField", false},
{schemapb.DataType_VarChar, "varCharPrimaryField", false},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_stat_blobs", test.pkType)
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
stat, err := b.genStatBlobs(genTestStat(meta), 10, 1, iCodec, kvs, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(stat))
assert.Equal(t, 1, len(kvs))
})
}
})
t.Run("Test genStatsBlob error", func(t *testing.T) {
f := &MetaFactory{}
alloc := allocator.NewMockAllocator(t)
b := binlogIO{cm, alloc}
t.Run("serialize error", func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_stat_blobs_error", schemapb.DataType_Int64)
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
_, err := b.genStatBlobs(nil, 10, 1, iCodec, kvs, 0)
assert.Error(t, err)
})
})
}
type mockCm struct {
storage.ChunkManager
errMultiLoad bool
errMultiSave bool
errMultiLoad bool
errMultiSave bool
MultiReadReturn [][]byte
ReadReturn []byte
}
var _ storage.ChunkManager = (*mockCm)(nil)
@ -450,13 +384,17 @@ func (mk *mockCm) MultiWrite(ctx context.Context, contents map[string][]byte) er
}
func (mk *mockCm) Read(ctx context.Context, filePath string) ([]byte, error) {
return nil, nil
return mk.ReadReturn, nil
}
func (mk *mockCm) MultiRead(ctx context.Context, filePaths []string) ([][]byte, error) {
if mk.errMultiLoad {
return nil, errors.New("mockKv multiload error")
}
if mk.MultiReadReturn != nil {
return mk.MultiReadReturn, nil
}
return [][]byte{[]byte("a")}, nil
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"path"
"runtime"
"sync"
"time"
@ -63,25 +64,28 @@ type Channel interface {
getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error)
getChannelName(segID UniqueID) string
addSegment(req addSegmentReq) error
getSegment(segID UniqueID) *Segment
removeSegments(segID ...UniqueID)
hasSegment(segID UniqueID, countFlushed bool) bool
InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error
RollPKstats(segID UniqueID, stats *storage.PrimaryKeyStats)
listAllSegmentIDs() []UniqueID
listNotFlushedSegmentIDs() []UniqueID
addSegment(req addSegmentReq) error
listPartitionSegments(partID UniqueID) []UniqueID
filterSegments(partitionID UniqueID) []*Segment
listNewSegmentsStartPositions() []*datapb.SegmentStartPosition
transferNewSegments(segmentIDs []UniqueID)
updateSegmentPKRange(segID UniqueID, ids storage.FieldData)
mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error
hasSegment(segID UniqueID, countFlushed bool) bool
removeSegments(segID ...UniqueID)
listCompactedSegmentIDs() map[UniqueID][]UniqueID
listSegmentIDsToSync(ts Timestamp) []UniqueID
setSegmentLastSyncTs(segID UniqueID, ts Timestamp)
updateSegmentRowNumber(segID UniqueID, numRows int64)
updateSegmentMemorySize(segID UniqueID, memorySize int64)
InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error
RollPKstats(segID UniqueID, stats []*storage.PrimaryKeyStats)
getSegmentStatisticsUpdates(segID UniqueID) (*commonpb.SegmentStats, error)
segmentFlushed(segID UniqueID)
@ -130,6 +134,7 @@ type addSegmentReq struct {
numOfRows int64
startPos, endPos *msgpb.MsgPosition
statsBinLogs []*datapb.FieldBinlog
binLogs []*datapb.FieldBinlog
recoverTs Timestamp
importing bool
}
@ -195,23 +200,6 @@ func (c *ChannelMeta) getChannelName(segID UniqueID) string {
return c.channelName
}
// maxRowCountPerSegment returns max row count for a segment based on estimation of row size.
func (c *ChannelMeta) maxRowCountPerSegment(ts Timestamp) (int64, error) {
log := log.With(zap.Int64("collectionID", c.collectionID), zap.Uint64("timpstamp", ts))
schema, err := c.getCollectionSchema(c.collectionID, ts)
if err != nil {
log.Warn("failed to get collection schema", zap.Error(err))
return 0, err
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
if err != nil {
log.Warn("failed to estimate size per record", zap.Error(err))
return 0, err
}
threshold := Params.DataCoordCfg.SegmentMaxSize.GetAsFloat() * 1024 * 1024
return int64(threshold / float64(sizePerRecord)), nil
}
// addSegment adds the segment to current channel. Segments can be added as *new*, *normal* or *flushed*.
// Make sure to verify `channel.hasSegment(segID)` == false before calling `channel.addSegment()`.
func (c *ChannelMeta) addSegment(req addSegmentReq) error {
@ -258,6 +246,14 @@ func (c *ChannelMeta) addSegment(req addSegmentReq) error {
return nil
}
func (c *ChannelMeta) getSegment(segID UniqueID) *Segment {
seg, ok := c.segments[segID]
if !ok {
return nil
}
return seg
}
func (c *ChannelMeta) listCompactedSegmentIDs() map[UniqueID][]UniqueID {
c.segMu.RLock()
defer c.segMu.RUnlock()
@ -397,13 +393,26 @@ func (c *ChannelMeta) loadStats(ctx context.Context, segmentID int64, collection
}
// filter stats binlog files which is pk field stats log
var bloomFilterFiles []string
bloomFilterFiles := []string{}
logType := storage.DefaultStatsType
for _, binlog := range statsBinlogs {
if binlog.FieldID != pkField {
continue
}
Loop:
for _, log := range binlog.GetBinlogs() {
bloomFilterFiles = append(bloomFilterFiles, log.GetLogPath())
_, logidx := path.Split(log.GetLogPath())
// if special status log exist
// only load one file
switch logidx {
case storage.CompoundStatsType.LogIdx():
bloomFilterFiles = []string{log.GetLogPath()}
logType = storage.CompoundStatsType
break Loop
default:
bloomFilterFiles = append(bloomFilterFiles, log.GetLogPath())
}
}
}
@ -424,11 +433,21 @@ func (c *ChannelMeta) loadStats(ctx context.Context, segmentID int64, collection
blobs = append(blobs, &Blob{Value: values[i]})
}
stats, err := storage.DeserializeStats(blobs)
if err != nil {
log.Warn("failed to deserialize bloom filter files", zap.Error(err))
return nil, merr.WrapErrParameterInvalid("valid statslog", "corrupted statslog", err.Error())
var stats []*storage.PrimaryKeyStats
if logType == storage.CompoundStatsType {
stats, err = storage.DeserializeStatsList(blobs[0])
if err != nil {
log.Warn("failed to deserialize stats list", zap.Error(err))
return nil, err
}
} else {
stats, err = storage.DeserializeStats(blobs)
if err != nil {
log.Warn("failed to deserialize stats", zap.Error(err))
return nil, err
}
}
var size uint
result := make([]*storage.PkStatistics, 0, len(stats))
for _, stat := range stats {
@ -455,23 +474,27 @@ func (c *ChannelMeta) initPKstats(ctx context.Context, s *Segment, statsBinlogs
return nil
}
func (c *ChannelMeta) RollPKstats(segID UniqueID, stats []*storage.PrimaryKeyStats) {
func (c *ChannelMeta) RollPKstats(segID UniqueID, stat *storage.PrimaryKeyStats) {
if stat == nil {
log.Warn("sync but no any pk stats", zap.Int64("segmentID", segID))
return
}
c.segMu.Lock()
defer c.segMu.Unlock()
seg, ok := c.segments[segID]
log.Info("roll pk stats", zap.Int64("segment id", segID))
if ok && seg.notFlushed() {
for _, stat := range stats {
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
}
seg.historyStats = append(seg.historyStats, pkStat)
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
}
seg.historyStats = append(seg.historyStats, pkStat)
seg.currentStat = nil
return
}
// should not happen at all
if ok {
log.Warn("only growing segment should roll PK stats", zap.Int64("segment", segID), zap.Any("type", seg.sType))

View File

@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"path"
"testing"
"time"
@ -38,6 +39,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/metautil"
)
var channelMetaNodeTestDir = "/tmp/milvus_test/channel_meta"
@ -700,6 +702,68 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
}
func TestChannelMeta_loadStats(t *testing.T) {
f := &MetaFactory{}
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
t.Run("list with merged stats log", func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_load_stats", schemapb.DataType_Int64)
// load normal stats log
seg1 := &Segment{
segmentID: 1,
partitionID: 2,
}
// load flushed stats log
seg2 := &Segment{
segmentID: 2,
partitionID: 2,
}
//gen pk stats bytes
stats := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10)
iCodec := storage.NewInsertCodecWithSchema(meta)
cm := &mockCm{}
channel := newChannel("channel", 1, meta.Schema, rc, cm)
channel.segments[seg1.segmentID] = seg1
channel.segments[seg2.segmentID] = seg2
// load normal stats log
blob, err := iCodec.SerializePkStats(stats, 10)
assert.NoError(t, err)
cm.MultiReadReturn = [][]byte{blob.Value}
_, err = channel.loadStats(
context.Background(), seg1.segmentID, 1,
[]*datapb.FieldBinlog{{
FieldID: 106,
Binlogs: []*datapb.Binlog{{
//<StatsLogPath>/<collectionID>/<partitionID>/<segmentID>/<FieldID>/<logIdx>
LogPath: path.Join(common.SegmentStatslogPath, metautil.JoinIDPath(1, 2, 1, 106, 10)),
}}}}, 0)
assert.NoError(t, err)
// load flushed stats log
blob, err = iCodec.SerializePkStatsList([]*storage.PrimaryKeyStats{stats}, 10)
assert.NoError(t, err)
cm.MultiReadReturn = [][]byte{blob.Value}
_, err = channel.loadStats(
context.Background(), seg2.segmentID, 1,
[]*datapb.FieldBinlog{{
FieldID: 106,
Binlogs: []*datapb.Binlog{{
//<StatsLogPath>/<collectionID>/<partitionID>/<segmentID>/<FieldID>/<logIdx>
LogPath: path.Join(common.SegmentStatslogPath, metautil.JoinIDPath(1, 2, 2, 106), storage.CompoundStatsType.LogIdx()),
}}}}, 0)
assert.NoError(t, err)
})
}
func TestChannelMeta_UpdatePKRange(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -978,18 +1042,6 @@ func (s *ChannelMetaSuite) TestHasSegment() {
}
}
func (s *ChannelMetaSuite) getSegmentByID(id UniqueID) (*Segment, bool) {
s.channel.segMu.RLock()
defer s.channel.segMu.RUnlock()
seg, ok := s.channel.segments[id]
if ok && seg.isValid() {
return seg, true
}
return nil, false
}
func TestChannelMetaSuite(t *testing.T) {
suite.Run(t, new(ChannelMetaSuite))
}

View File

@ -27,6 +27,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -136,6 +137,20 @@ func (t *compactionTask) getChannelName() string {
return t.plan.GetChannel()
}
// return num rows of all segment compaction from
func (t *compactionTask) getNumRows() (int64, error) {
numRows := int64(0)
for _, binlog := range t.plan.SegmentBinlogs {
seg := t.Channel.getSegment(binlog.GetSegmentID())
if seg == nil {
return 0, merr.WrapErrSegmentNotFound(binlog.GetSegmentID(), "get compaction segments num rows failed")
}
numRows += seg.numRows
}
return numRows, nil
}
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (
map[interface{}]Timestamp, *DelDataBuf, error) {
log := log.With(zap.Int64("planID", t.getPlanID()))
@ -196,13 +211,51 @@ func nano2Milli(nano time.Duration) float64 {
return float64(nano) / float64(time.Millisecond)
}
func (t *compactionTask) uploadRemainLog(
ctxTimeout context.Context,
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
stats *storage.PrimaryKeyStats,
totRows int64,
fID2Content map[UniqueID][]interface{},
fID2Type map[UniqueID]schemapb.DataType) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
var iData *InsertData
// remain insert data
if len(fID2Content) != 0 {
iData = &InsertData{Data: make(map[storage.FieldID]storage.FieldData)}
for fID, content := range fID2Content {
tp, ok := fID2Type[fID]
if !ok {
log.Warn("no field ID in this schema", zap.Int64("fieldID", fID))
return nil, nil, errors.New("Unexpected error")
}
fData, err := interface2FieldData(tp, content, int64(len(content)))
if err != nil {
log.Warn("transfer interface to FieldData wrong", zap.Error(err))
return nil, nil, err
}
iData.Data[fID] = fData
}
}
inPaths, statPaths, err := t.uploadStatsLog(ctxTimeout, targetSegID, partID, iData, stats, totRows, meta)
if err != nil {
return nil, nil, err
}
return inPaths, statPaths, nil
}
func (t *compactionTask) uploadSingleInsertLog(
ctxTimeout context.Context,
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
fID2Content map[UniqueID][]interface{},
fID2Type map[UniqueID]schemapb.DataType) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
fID2Type map[UniqueID]schemapb.DataType) (map[UniqueID]*datapb.FieldBinlog, error) {
iData := &InsertData{
Data: make(map[storage.FieldID]storage.FieldData)}
@ -210,23 +263,23 @@ func (t *compactionTask) uploadSingleInsertLog(
tp, ok := fID2Type[fID]
if !ok {
log.Warn("no field ID in this schema", zap.Int64("fieldID", fID))
return nil, nil, errors.New("Unexpected error")
return nil, errors.New("Unexpected error")
}
fData, err := interface2FieldData(tp, content, int64(len(content)))
if err != nil {
log.Warn("transfer interface to FieldData wrong", zap.Error(err))
return nil, nil, err
return nil, err
}
iData.Data[fID] = fData
}
inPaths, statPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, iData, meta)
inPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, iData, meta)
if err != nil {
return nil, nil, err
return nil, err
}
return inPaths, statPaths, nil
return inPaths, nil
}
func (t *compactionTask) merge(
@ -245,10 +298,6 @@ func (t *compactionTask) merge(
numRows int64 // the number of rows uploaded
expired int64 // the number of expired entity
// statslog generation
pkID UniqueID
pkType schemapb.DataType
fID2Type = make(map[UniqueID]schemapb.DataType)
fID2Content = make(map[UniqueID][]interface{})
@ -295,14 +344,22 @@ func (t *compactionTask) merge(
}
// get pkID, pkType, dim
var pkField *schemapb.FieldSchema
for _, fs := range meta.GetSchema().GetFields() {
fID2Type[fs.GetFieldID()] = fs.GetDataType()
if fs.GetIsPrimaryKey() && fs.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(fs.GetDataType()) {
pkID = fs.GetFieldID()
pkType = fs.GetDataType()
pkField = fs
}
}
if pkField == nil {
log.Warn("failed to get pk filed from schema")
return nil, nil, 0, fmt.Errorf("no pk field in schema")
}
pkID := pkField.GetFieldID()
pkType := pkField.GetDataType()
// estimate Rows per binlog
// TODO should not convert size to row because we already know the size, this is especially important on varchar types.
size, err := typeutil.EstimateSizePerRecord(meta.GetSchema())
@ -324,6 +381,13 @@ func (t *compactionTask) merge(
downloadTimeCost := time.Duration(0)
uploadInsertTimeCost := time.Duration(0)
oldRowNums, err := t.getNumRows()
if err != nil {
return nil, nil, 0, err
}
stats := storage.NewPrimaryKeyStats(pkID, int64(pkType), oldRowNums)
for _, path := range unMergedInsertlogs {
downloadStart := time.Now()
data, err := t.download(ctxTimeout, path)
@ -369,18 +433,19 @@ func (t *compactionTask) merge(
}
fID2Content[fID] = append(fID2Content[fID], vInter)
}
//update pk to new stats log
stats.Update(v.PK)
currentRows++
if currentRows >= maxRowsPerBinlog {
uploadInsertStart := time.Now()
inPaths, statsPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type)
inPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return nil, nil, 0, err
}
uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths)
addStatFieldPath(statsPaths)
fID2Content = make(map[int64][]interface{})
currentRows = 0
@ -389,20 +454,21 @@ func (t *compactionTask) merge(
}
}
}
if currentRows != 0 {
uploadInsertStart := time.Now()
inPaths, statsPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type)
// upload stats log and remain insert rows
if numRows != 0 || currentRows != 0 {
uploadStart := time.Now()
inPaths, statsPaths, err := t.uploadRemainLog(ctxTimeout, targetSegID, partID, meta,
stats, numRows+int64(currentRows), fID2Content, fID2Type)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return nil, nil, 0, err
}
uploadInsertTimeCost += time.Since(uploadInsertStart)
uploadInsertTimeCost += time.Since(uploadStart)
addInsertFieldPath(inPaths)
addStatFieldPath(statsPaths)
numRows += int64(currentRows)
numBinlogs++
numBinlogs += len(inPaths)
}
for _, path := range insertField2Path {

View File

@ -24,6 +24,7 @@ import (
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -280,8 +281,11 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
Schema: meta.GetSchema(),
}, nil)
channel := newChannel("a", collectionID, meta.GetSchema(), rc, nil)
channel.segments[1] = &Segment{numRows: 10}
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
alloc.EXPECT().AllocOne().Return(0, nil)
t.Run("Merge without expiration", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
@ -289,7 +293,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
iData := genInsertDataWithExpiredTS()
var allPaths [][]string
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -307,7 +311,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
1: 10000,
}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)}
ct := &compactionTask{
Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1}},
}}
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
@ -326,7 +335,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -342,13 +351,18 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
dm := map[interface{}]Timestamp{}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)}
ct := &compactionTask{
Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1}},
}}
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 2, len(inPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(statsPaths))
assert.Equal(t, 2, len(statsPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(statsPaths[0].GetBinlogs()))
})
t.Run("Merge with expiration", func(t *testing.T) {
@ -358,7 +372,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -383,6 +397,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
uploader: mockbIO,
plan: &datapb.CompactionPlan{
CollectionTtl: 864000,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1}},
},
done: make(chan struct{}, 1),
}
@ -400,7 +416,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -418,7 +434,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
1: 10000,
}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)}
ct := &compactionTask{
Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1}},
}}
_, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{
Schema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
@ -436,7 +457,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -537,6 +558,81 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
assert.Equal(t, false, res)
})
})
t.Run("Test getNumRows error", func(t *testing.T) {
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
cm := &mockCm{}
ct := &compactionTask{
Channel: newChannel("channel", 1, nil, rc, cm),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 1,
}},
},
done: make(chan struct{}, 1),
}
//segment not in channel
_, err := ct.getNumRows()
assert.Error(t, err)
})
t.Run("Test uploadRemainLog error", func(t *testing.T) {
f := &MetaFactory{}
t.Run("field not in field to type", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
ct := &compactionTask{
done: make(chan struct{}, 1),
}
meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64)
fid2C := make(map[int64][]interface{})
fid2T := make(map[int64]schemapb.DataType)
fid2C[1] = nil
_, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, nil, 0, fid2C, fid2T)
assert.Error(t, err)
})
t.Run("transfer interface wrong", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
ct := &compactionTask{
done: make(chan struct{}, 1),
}
meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64)
fid2C := make(map[int64][]interface{})
fid2T := make(map[int64]schemapb.DataType)
fid2C[1] = nil
_, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, nil, 0, fid2C, fid2T)
assert.Error(t, err)
})
t.Run("upload failed", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64)
stats := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10)
ct := &compactionTask{
uploader: &binlogIO{&mockCm{errMultiSave: true}, alloc},
done: make(chan struct{}, 1),
}
_, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil, nil)
assert.Error(t, err)
})
})
}
func getInt64DeltaBlobs(segID UniqueID, pks []UniqueID, tss []Timestamp) ([]*Blob, error) {
@ -555,13 +651,6 @@ func getInt64DeltaBlobs(segID UniqueID, pks []UniqueID, tss []Timestamp) ([]*Blo
return []*Blob{blob}, err
}
func getInsertBlobs(segID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) ([]*Blob, error) {
iCodec := storage.NewInsertCodecWithSchema(meta)
iblobs, _, err := iCodec.Serialize(10, segID, iData)
return iblobs, err
}
func TestCompactorInterfaceMethods(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -677,28 +766,34 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 1,
}
cpaths1, err := mockbIO.upload(context.TODO(), c.segID1, c.parID, []*InsertData{iData1}, dData1, meta)
stats1 := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1)
iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), c.segID1, c.parID, iData1, stats1, 2, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths1.inPaths))
dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID1, c.parID, dData1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths1))
cpaths2, err := mockbIO.upload(context.TODO(), c.segID2, c.parID, []*InsertData{iData2}, dData2, meta)
stats2 := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1)
iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), c.segID2, c.parID, iData2, stats2, 2, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths2.inPaths))
dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID2, c.parID, dData2, meta)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths2))
plan := &datapb.CompactionPlan{
PlanID: 10080,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: c.segID1,
FieldBinlogs: cpaths1.inPaths,
Field2StatslogPaths: cpaths1.statsPaths,
Deltalogs: cpaths1.deltaInfo,
FieldBinlogs: lo.Values(iPaths1),
Field2StatslogPaths: lo.Values(sPaths1),
Deltalogs: dPaths1,
},
{
SegmentID: c.segID2,
FieldBinlogs: cpaths2.inPaths,
Field2StatslogPaths: cpaths2.statsPaths,
Deltalogs: cpaths2.deltaInfo,
FieldBinlogs: lo.Values(iPaths2),
Field2StatslogPaths: lo.Values(sPaths2),
Deltalogs: dPaths2,
},
},
StartTime: 0,
@ -814,28 +909,34 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 0,
}
cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta)
stats1 := storage.NewPrimaryKeyStats(1, int64(rc.pkType), 1)
iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), segID1, partID, iData1, stats1, 1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths1.inPaths))
dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), segID1, partID, dData1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths1))
cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, dData2, meta)
stats2 := storage.NewPrimaryKeyStats(1, int64(rc.pkType), 1)
iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), segID2, partID, iData2, stats2, 1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths2.inPaths))
dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), segID2, partID, dData2, meta)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths2))
plan := &datapb.CompactionPlan{
PlanID: 20080,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: segID1,
FieldBinlogs: cpaths1.inPaths,
Field2StatslogPaths: cpaths1.statsPaths,
Deltalogs: cpaths1.deltaInfo,
FieldBinlogs: lo.Values(iPaths1),
Field2StatslogPaths: lo.Values(sPaths1),
Deltalogs: dPaths1,
},
{
SegmentID: segID2,
FieldBinlogs: cpaths2.inPaths,
Field2StatslogPaths: cpaths2.statsPaths,
Deltalogs: cpaths2.deltaInfo,
FieldBinlogs: lo.Values(iPaths2),
Field2StatslogPaths: lo.Values(sPaths2),
Deltalogs: dPaths2,
},
},
StartTime: 0,
@ -876,7 +977,7 @@ type mockFlushManager struct {
var _ flushManager = (*mockFlushManager)(nil)
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) ([]*Blob, error) {
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error) {
if mfm.returnError {
return nil, fmt.Errorf("mock error")
}

View File

@ -217,7 +217,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick
continue
}
log.Info("recover growing segments form checkpoints",
log.Info("recover growing segments from checkpoints",
zap.String("vChannelName", us.GetInsertChannel()),
zap.Int64("segmentID", us.GetID()),
zap.Int64("numRows", us.GetNumOfRows()),
@ -233,6 +233,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick
partitionID: segment.PartitionID,
numOfRows: segment.GetNumOfRows(),
statsBinLogs: segment.Statslogs,
binLogs: segment.GetBinlogs(),
endPos: segment.GetDmlPosition(),
recoverTs: vchanInfo.GetSeekPosition().GetTimestamp()}); err != nil {
return nil, err
@ -265,10 +266,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick
if err := dsService.channel.addSegment(addSegmentReq{
segType: datapb.SegmentType_Flushed,
segID: segment.GetID(),
collID: segment.CollectionID,
partitionID: segment.PartitionID,
collID: segment.GetCollectionID(),
partitionID: segment.GetPartitionID(),
numOfRows: segment.GetNumOfRows(),
statsBinLogs: segment.Statslogs,
statsBinLogs: segment.GetStatslogs(),
binLogs: segment.GetBinlogs(),
recoverTs: vchanInfo.GetSeekPosition().GetTimestamp(),
}); err != nil {
return nil, err

View File

@ -470,10 +470,11 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
zap.String("channel", ibNode.channelName),
)
// use the flushed pk stats to take current stat
var pkStats []*storage.PrimaryKeyStats
var pkStats *storage.PrimaryKeyStats
// TODO, this has to be async flush, no need to block here.
err := retry.Do(ibNode.ctx, func() error {
statBlobs, err := ibNode.flushManager.flushBufferData(task.buffer,
var err error
pkStats, err = ibNode.flushManager.flushBufferData(task.buffer,
task.segmentID,
task.flushed,
task.dropped,
@ -481,11 +482,6 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
if err != nil {
return err
}
pkStats, err = storage.DeserializeStats(statBlobs)
if err != nil {
log.Warn("failed to deserialize bloom filter files", zap.Error(err))
return err
}
return nil
}, getFlowGraphRetryOpt())
if err != nil {

View File

@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
@ -47,7 +48,7 @@ import (
// flushManager defines a flush manager signature
type flushManager interface {
// notify flush manager insert buffer data
flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) ([]*Blob, error)
flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error)
// notify flush manager del buffer data
flushDelData(data *DelDataBuf, segmentID UniqueID, pos *msgpb.MsgPosition) error
// injectFlush injects compaction or other blocking task before flush sync
@ -340,56 +341,129 @@ func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flush
m.getFlushQueue(segmentID).enqueueDelFlush(task, deltaLogs, pos)
}
// flushBufferData notifies flush manager insert buffer data.
// This method will be retired on errors. Final errors will be propagated upstream and logged.
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) ([]*Blob, error) {
tr := timerecord.NewTimeRecorder("flushDuration")
// empty flush
func (m *rendezvousFlushManager) serializeBinLog(segmentID, partID int64, data *BufferData, inCodec *storage.InsertCodec) ([]*Blob, map[int64]int, error) {
fieldMemorySize := make(map[int64]int)
if data == nil || data.buffer == nil {
//m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{},
// map[UniqueID]string{}, map[UniqueID]string{}, flushed, dropped, pos)
m.handleInsertTask(segmentID, &flushBufferInsertTask{}, map[UniqueID]*datapb.Binlog{}, map[UniqueID]*datapb.Binlog{},
flushed, dropped, pos)
return nil, nil
return []*Blob{}, fieldMemorySize, nil
}
collID, partID, meta, err := m.getSegmentMeta(segmentID, pos)
if err != nil {
return nil, err
}
// get memory size of buffer data
fieldMemorySize := make(map[int64]int)
for fieldID, fieldData := range data.buffer.Data {
fieldMemorySize[fieldID] = fieldData.GetMemorySize()
}
// encode data and convert output data
inCodec := storage.NewInsertCodecWithSchema(meta)
blobs, err := inCodec.Serialize(partID, segmentID, data.buffer)
if err != nil {
return nil, nil, err
}
return blobs, fieldMemorySize, nil
}
binLogs, statsBinlogs, err := inCodec.Serialize(partID, segmentID, data.buffer)
func (m *rendezvousFlushManager) serializePkStatsLog(segmentID int64, flushed bool, data *BufferData, inCodec *storage.InsertCodec) (*Blob, *storage.PrimaryKeyStats, error) {
var err error
var stats *storage.PrimaryKeyStats
pkField := getPKField(inCodec.Schema)
if pkField == nil {
log.Error("No pk filed in schema", zap.Int64("segmentID", segmentID), zap.Int64("collectionID", inCodec.Schema.GetID()))
return nil, nil, fmt.Errorf("no primary key in meta")
}
var insertData storage.FieldData
rowNum := int64(0)
if data != nil && data.buffer != nil {
insertData = data.buffer.Data[pkField.FieldID]
rowNum = int64(insertData.RowNum())
if insertData.RowNum() > 0 {
// gen stats of buffer insert data
stats = storage.NewPrimaryKeyStats(pkField.FieldID, int64(pkField.DataType), rowNum)
stats.UpdateByMsgs(insertData)
}
}
// get all stats log as a list, serialize to blob
// if flushed
if flushed {
seg := m.getSegment(segmentID)
if seg == nil {
return nil, nil, merr.WrapErrSegmentNotFound(segmentID)
}
statsList, oldRowNum := seg.getHistoricalStats(pkField)
if stats != nil {
statsList = append(statsList, stats)
}
blob, err := inCodec.SerializePkStatsList(statsList, oldRowNum+rowNum)
if err != nil {
return nil, nil, err
}
return blob, stats, nil
}
if rowNum == 0 {
return nil, nil, nil
}
// only serialize stats gen from new insert data
// if not flush
blob, err := inCodec.SerializePkStats(stats, rowNum)
if err != nil {
return nil, nil, err
}
return blob, stats, nil
}
// flushBufferData notifies flush manager insert buffer data.
// This method will be retired on errors. Final errors will be propagated upstream and logged.
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error) {
field2Insert := make(map[UniqueID]*datapb.Binlog)
field2Stats := make(map[UniqueID]*datapb.Binlog)
kvs := make(map[string][]byte)
tr := timerecord.NewTimeRecorder("flushDuration")
// get segment info
collID, partID, meta, err := m.getSegmentMeta(segmentID, pos)
if err != nil {
return nil, err
}
inCodec := storage.NewInsertCodecWithSchema(meta)
// build bin log blob
binLogBlobs, fieldMemorySize, err := m.serializeBinLog(segmentID, partID, data, inCodec)
if err != nil {
return nil, err
}
// build stats log blob
pkStatsBlob, stats, err := m.serializePkStatsLog(segmentID, flushed, data, inCodec)
if err != nil {
return nil, err
}
// allocate
// alloc for stats log if have new stats log and not flushing
var logidx int64
allocNum := uint32(len(binLogBlobs) + boolToInt(!flushed && pkStatsBlob != nil))
if allocNum != 0 {
logidx, _, err = m.Alloc(allocNum)
if err != nil {
return nil, err
}
}
// binlogs
start, _, err := m.Alloc(uint32(len(binLogs) + len(statsBinlogs)))
if err != nil {
return nil, err
}
field2Insert := make(map[UniqueID]*datapb.Binlog, len(binLogs))
kvs := make(map[string][]byte, len(binLogs))
for idx, blob := range binLogs {
for _, blob := range binLogBlobs {
defer func() { logidx++ }()
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
return nil, err
}
logidx := start + int64(idx)
k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx)
// [rootPath]/[insert_log]/key
key := path.Join(m.ChunkManager.RootPath(), common.SegmentInsertLogPath, k)
kvs[key] = blob.Value[:]
@ -402,27 +476,32 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
}
}
field2Stats := make(map[UniqueID]*datapb.Binlog)
// write stats binlog
for idx, blob := range statsBinlogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
// pk stats binlog
if pkStatsBlob != nil {
fieldID, err := strconv.ParseInt(pkStatsBlob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
return nil, err
}
logidx := start + UniqueID(len(binLogs)+idx)
// use storage.FlushedStatsLogIdx as logidx if flushed
// else use last idx we allocated
var key string
if flushed {
k := metautil.JoinIDPath(collID, partID, segmentID, fieldID)
key = path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k, storage.CompoundStatsType.LogIdx())
} else {
k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx)
key = path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
}
k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx)
key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
kvs[key] = blob.Value
kvs[key] = pkStatsBlob.Value
field2Stats[fieldID] = &datapb.Binlog{
EntriesNum: 0,
TimestampFrom: 0, //TODO
TimestampTo: 0, //TODO,
LogPath: key,
LogSize: int64(len(blob.Value)),
LogSize: int64(len(pkStatsBlob.Value)),
}
}
@ -432,7 +511,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
}, field2Insert, field2Stats, flushed, dropped, pos)
metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
return statsBinlogs, nil
return stats, nil
}
// notify flush manager del buffer data
@ -866,5 +945,6 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
dsService.flushingSegCache.Remove(req.GetSegmentID())
dsService.channel.evictHistoryInsertBuffer(req.GetSegmentID(), pack.pos)
dsService.channel.evictHistoryDeleteBuffer(req.GetSegmentID(), pack.pos)
// dsService.channel.saveBinlogPath(fieldStats)
}
}

View File

@ -145,7 +145,15 @@ func TestOrderFlushQueue_Order(t *testing.T) {
func newTestChannel() *ChannelMeta {
return &ChannelMeta{
segments: make(map[UniqueID]*Segment),
segments: make(map[UniqueID]*Segment),
collectionID: 1,
collSchema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{{
FieldID: 100,
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
}},
},
}
}
@ -157,12 +165,26 @@ func TestRendezvousFlushManager(t *testing.T) {
size := 1000
var counter atomic.Int64
finish := sync.WaitGroup{}
finish.Add(size)
alloc := allocator.NewMockAllocator(t)
m := NewRendezvousFlushManager(alloc, cm, newTestChannel(), func(pack *segmentFlushPack) {
channel := newTestChannel()
hisotricalStats := storage.NewPrimaryKeyStats(100, 5, 10)
testSeg := &Segment{
collectionID: 1,
segmentID: 3,
// flush segment will merge all historial stats
historyStats: []*storage.PkStatistics{
{
PkFilter: hisotricalStats.BF,
MinPK: hisotricalStats.MinPk,
MaxPK: hisotricalStats.MaxPk,
},
},
}
testSeg.setType(datapb.SegmentType_New)
channel.segments[testSeg.segmentID] = testSeg
m := NewRendezvousFlushManager(alloc, cm, channel, func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
}, emptyFlushAndDropFunc)
ids := make([][]byte, 0, size)
@ -172,21 +194,18 @@ func TestRendezvousFlushManager(t *testing.T) {
ids = append(ids, id)
}
wg := sync.WaitGroup{}
wg.Add(size)
for i := 0; i < size; i++ {
m.flushDelData(nil, 1, &msgpb.MsgPosition{
err := m.flushDelData(nil, testSeg.segmentID, &msgpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
}
wg.Wait()
finish.Wait()
assert.NoError(t, err)
assert.EqualValues(t, size, counter.Load())
_, err = m.flushBufferData(nil, testSeg.segmentID, true, false, &msgpb.MsgPosition{
MsgID: ids[i],
})
assert.NoError(t, err)
}
assert.Eventually(t, func() bool { return counter.Load() == int64(size) }, 3*time.Second, 100*time.Millisecond)
}
func TestRendezvousFlushManager_Inject(t *testing.T) {
@ -197,17 +216,32 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
size := 1000
var counter atomic.Int64
finish := sync.WaitGroup{}
finish.Add(size)
var packMut sync.Mutex
packs := make([]*segmentFlushPack, 0, size+3)
alloc := allocator.NewMockAllocator(t)
m := NewRendezvousFlushManager(alloc, cm, newTestChannel(), func(pack *segmentFlushPack) {
channel := newTestChannel()
segments := []*Segment{{
collectionID: 1,
segmentID: 1,
}, {
collectionID: 1,
segmentID: 2,
}, {
collectionID: 1,
segmentID: 3,
}}
for _, seg := range segments {
seg.setType(datapb.SegmentType_New)
channel.segments[seg.segmentID] = seg
}
m := NewRendezvousFlushManager(alloc, cm, channel, func(pack *segmentFlushPack) {
packMut.Lock()
packs = append(packs, pack)
packMut.Unlock()
counter.Inc()
finish.Done()
}, emptyFlushAndDropFunc)
ti := newTaskInjection(1, func(*segmentFlushPack) {})
@ -222,58 +256,61 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
ids = append(ids, id)
}
wg := sync.WaitGroup{}
wg.Add(size)
for i := 0; i < size; i++ {
m.flushDelData(nil, 1, &msgpb.MsgPosition{
err := m.flushDelData(nil, 1, &msgpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{
assert.NoError(t, err)
_, err = m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
assert.NoError(t, err)
}
wg.Wait()
finish.Wait()
assert.Eventually(t, func() bool { return counter.Load() == int64(size) }, 3*time.Second, 100*time.Millisecond)
assert.EqualValues(t, size, counter.Load())
finish.Add(2)
id := make([]byte, 10)
rand.Read(id)
id2 := make([]byte, 10)
rand.Read(id2)
m.flushBufferData(nil, 2, true, false, &msgpb.MsgPosition{
_, err := m.flushBufferData(nil, 2, true, false, &msgpb.MsgPosition{
MsgID: id,
})
m.flushBufferData(nil, 3, true, false, &msgpb.MsgPosition{
assert.NoError(t, err)
_, err = m.flushBufferData(nil, 3, true, false, &msgpb.MsgPosition{
MsgID: id2,
})
assert.NoError(t, err)
ti = newTaskInjection(2, func(pack *segmentFlushPack) {
pack.segmentID = 4
})
m.injectFlush(ti, 2, 3)
m.flushDelData(nil, 2, &msgpb.MsgPosition{
err = m.flushDelData(nil, 2, &msgpb.MsgPosition{
MsgID: id,
})
m.flushDelData(nil, 3, &msgpb.MsgPosition{
assert.NoError(t, err)
err = m.flushDelData(nil, 3, &msgpb.MsgPosition{
MsgID: id2,
})
assert.NoError(t, err)
<-ti.Injected()
ti.injectDone(true)
finish.Wait()
assert.EqualValues(t, size+2, counter.Load())
assert.Eventually(t, func() bool { return counter.Load() == int64(size+2) }, 3*time.Second, 100*time.Millisecond)
assert.EqualValues(t, 4, packs[size].segmentID)
finish.Add(1)
rand.Read(id)
m.flushBufferData(nil, 2, false, false, &msgpb.MsgPosition{
_, err = m.flushBufferData(nil, 2, false, false, &msgpb.MsgPosition{
MsgID: id,
})
assert.NoError(t, err)
ti = newTaskInjection(1, func(pack *segmentFlushPack) {
pack.segmentID = 5
})
@ -286,8 +323,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
m.flushDelData(nil, 2, &msgpb.MsgPosition{
MsgID: id,
})
finish.Wait()
assert.EqualValues(t, size+3, counter.Load())
assert.Eventually(t, func() bool { return counter.Load() == int64(size+3) }, 3*time.Second, 100*time.Millisecond)
assert.EqualValues(t, 4, packs[size+1].segmentID)
}
@ -329,13 +365,18 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newTestChannel()
testSeg := &Segment{
collectionID: 1,
segmentID: 1,
}
testSeg.setType(datapb.SegmentType_New)
channel.segments[testSeg.segmentID] = testSeg
size := 1000
var counter atomic.Int64
var finish sync.WaitGroup
finish.Add(size)
m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) {
m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, channel, func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
}, emptyFlushAndDropFunc)
ids := make([][]byte, 0, size)
@ -346,9 +387,10 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
}
for i := 0; i < size; i++ {
m.flushDelData(nil, 1, &msgpb.MsgPosition{
err := m.flushDelData(nil, 1, &msgpb.MsgPosition{
MsgID: ids[i],
})
assert.NoError(t, err)
}
var finished bool
@ -368,9 +410,10 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
mut.RUnlock()
for i := 0; i < size/2; i++ {
m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{
_, err := m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{
MsgID: ids[i],
})
assert.NoError(t, err)
}
mut.RLock()
@ -378,9 +421,10 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
mut.RUnlock()
for i := size / 2; i < size; i++ {
m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{
_, err := m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{
MsgID: ids[i],
})
assert.NoError(t, err)
}
select {
@ -405,7 +449,29 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
var result []*segmentFlushPack
signal := make(chan struct{})
m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) {
channel := newTestChannel()
targets := make(map[int64]struct{})
//init failed segment
testSeg := &Segment{
collectionID: 1,
segmentID: -1,
}
testSeg.setType(datapb.SegmentType_New)
channel.segments[testSeg.segmentID] = testSeg
//init target segment
for i := 1; i < 11; i++ {
targets[int64(i)] = struct{}{}
testSeg := &Segment{
collectionID: 1,
segmentID: int64(i),
}
testSeg.setType(datapb.SegmentType_New)
channel.segments[testSeg.segmentID] = testSeg
}
//init flush manager
m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, channel, func(pack *segmentFlushPack) {
}, func(packs []*segmentFlushPack) {
mut.Lock()
result = packs
@ -414,26 +480,28 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
})
halfMsgID := []byte{1, 1, 1}
m.flushBufferData(nil, -1, true, false, &msgpb.MsgPosition{
_, err := m.flushBufferData(nil, -1, true, false, &msgpb.MsgPosition{
MsgID: halfMsgID,
})
assert.NoError(t, err)
m.startDropping()
// half normal, half drop mode, should not appear in final packs
m.flushDelData(nil, -1, &msgpb.MsgPosition{
err = m.flushDelData(nil, -1, &msgpb.MsgPosition{
MsgID: halfMsgID,
})
assert.NoError(t, err)
target := make(map[int64]struct{})
for i := 1; i < 11; i++ {
target[int64(i)] = struct{}{}
m.flushBufferData(nil, int64(i), true, false, &msgpb.MsgPosition{
MsgID: []byte{byte(i)},
for target := range targets {
_, err := m.flushBufferData(nil, target, true, false, &msgpb.MsgPosition{
MsgID: []byte{byte(target)},
})
m.flushDelData(nil, int64(i), &msgpb.MsgPosition{
MsgID: []byte{byte(i)},
assert.NoError(t, err)
err = m.flushDelData(nil, target, &msgpb.MsgPosition{
MsgID: []byte{byte(target)},
})
t.Log(i)
assert.NoError(t, err)
}
m.notifyAllFlushed()
@ -446,11 +514,12 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
for _, pack := range result {
assert.NotEqual(t, -1, pack.segmentID)
output[pack.segmentID] = struct{}{}
_, has := target[pack.segmentID]
_, has := targets[pack.segmentID]
assert.True(t, has)
}
assert.Equal(t, len(target), len(output))
assert.Equal(t, len(targets), len(output))
})
t.Run("test drop mode with injection", func(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
@ -458,8 +527,26 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
var mut sync.Mutex
var result []*segmentFlushPack
signal := make(chan struct{})
channel := newTestChannel()
//init failed segment
testSeg := &Segment{
collectionID: 1,
segmentID: -1,
}
testSeg.setType(datapb.SegmentType_New)
channel.segments[testSeg.segmentID] = testSeg
m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) {
//init target segment
for i := 1; i < 11; i++ {
seg := &Segment{
collectionID: 1,
segmentID: int64(i),
}
seg.setType(datapb.SegmentType_New)
channel.segments[seg.segmentID] = seg
}
m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, channel, func(pack *segmentFlushPack) {
}, func(packs []*segmentFlushPack) {
mut.Lock()
result = packs
@ -467,11 +554,14 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
close(signal)
})
//flush failed segment before start drop mode
halfMsgID := []byte{1, 1, 1}
m.flushBufferData(nil, -1, true, false, &msgpb.MsgPosition{
_, err := m.flushBufferData(nil, -1, true, false, &msgpb.MsgPosition{
MsgID: halfMsgID,
})
assert.NoError(t, err)
//inject target segment
injFunc := func(pack *segmentFlushPack) {
pack.segmentID = 100
}
@ -484,22 +574,24 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
m.startDropping()
// half normal, half drop mode, should not appear in final packs
m.flushDelData(nil, -1, &msgpb.MsgPosition{
err = m.flushDelData(nil, -1, &msgpb.MsgPosition{
MsgID: halfMsgID,
})
assert.NoError(t, err)
for i := 1; i < 11; i++ {
m.flushBufferData(nil, int64(i), true, false, &msgpb.MsgPosition{
_, err = m.flushBufferData(nil, int64(i), true, false, &msgpb.MsgPosition{
MsgID: []byte{byte(i)},
})
m.flushDelData(nil, int64(i), &msgpb.MsgPosition{
assert.NoError(t, err)
err = m.flushDelData(nil, int64(i), &msgpb.MsgPosition{
MsgID: []byte{byte(i)},
})
assert.NoError(t, err)
}
m.notifyAllFlushed()
<-signal
mut.Lock()
defer mut.Unlock()
@ -508,7 +600,6 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
assert.Equal(t, int64(100), pack.segmentID)
}
})
}
func TestRendezvousFlushManager_close(t *testing.T) {
@ -519,11 +610,19 @@ func TestRendezvousFlushManager_close(t *testing.T) {
size := 1000
var counter atomic.Int64
finish := sync.WaitGroup{}
finish.Add(size)
m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) {
channel := newTestChannel()
//init test segment
testSeg := &Segment{
collectionID: 1,
segmentID: 1,
}
testSeg.setType(datapb.SegmentType_New)
channel.segments[testSeg.segmentID] = testSeg
m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, channel, func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
}, emptyFlushAndDropFunc)
ids := make([][]byte, 0, size)
@ -533,22 +632,20 @@ func TestRendezvousFlushManager_close(t *testing.T) {
ids = append(ids, id)
}
wg := sync.WaitGroup{}
wg.Add(size)
for i := 0; i < size; i++ {
m.flushDelData(nil, 1, &msgpb.MsgPosition{
err := m.flushDelData(nil, 1, &msgpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
}
wg.Wait()
finish.Wait()
m.close()
assert.NoError(t, err)
assert.EqualValues(t, size, counter.Load())
_, err = m.flushBufferData(nil, 1, true, false, &msgpb.MsgPosition{
MsgID: ids[i],
})
assert.NoError(t, err)
}
m.close()
assert.Eventually(t, func() bool { return counter.Load() == int64(size) }, 3*time.Second, 100*time.Millisecond)
}
func TestFlushNotifyFunc(t *testing.T) {
@ -597,7 +694,6 @@ func TestFlushNotifyFunc(t *testing.T) {
notifyFunc(&segmentFlushPack{})
})
})
t.Run("normal segment not found", func(t *testing.T) {
dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_SegmentNotFound
assert.Panics(t, func() {

View File

@ -17,9 +17,9 @@
package datanode
import (
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/common"
)
// reviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2
@ -66,13 +66,11 @@ func reviseVChannelInfo(vChannel *datapb.VchannelInfo) {
vChannel.DroppedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetDroppedSegmentIds())
}
// getPKID returns the primary key field id from collection meta.
func getPKID(meta *etcdpb.CollectionMeta) UniqueID {
for _, field := range meta.GetSchema().GetFields() {
func getPKField(meta *etcdpb.CollectionMeta) *schemapb.FieldSchema {
for _, field := range meta.Schema.Fields {
if field.GetIsPrimaryKey() {
return field.GetFieldID()
return field
}
}
return common.InvalidFieldID
return nil
}

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/storage"
s "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
@ -1120,6 +1121,17 @@ func genInsertDataWithPKs(PKs [2]primaryKey, dataType schemapb.DataType) *Insert
return iD
}
func genTestStat(meta *etcdpb.CollectionMeta) *storage.PrimaryKeyStats {
var pkFieldID, pkFieldType int64
for _, field := range meta.Schema.Fields {
if field.IsPrimaryKey {
pkFieldID = field.FieldID
pkFieldType = int64(field.DataType)
}
}
return storage.NewPrimaryKeyStats(pkFieldID, pkFieldType, 0)
}
func genInsertData() *InsertData {
return &InsertData{
Data: map[int64]s.FieldData{

View File

@ -25,6 +25,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/msgpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
@ -93,6 +94,30 @@ func (s *Segment) updatePKRange(ids storage.FieldData) {
}
}
func (s *Segment) getHistoricalStats(pkField *schemapb.FieldSchema) ([]*storage.PrimaryKeyStats, int64) {
statsList := []*storage.PrimaryKeyStats{}
for _, stats := range s.historyStats {
statsList = append(statsList, &storage.PrimaryKeyStats{
FieldID: pkField.FieldID,
PkType: int64(pkField.DataType),
BF: stats.PkFilter,
MaxPk: stats.MaxPK,
MinPk: stats.MinPK,
})
}
if s.currentStat != nil {
statsList = append(statsList, &storage.PrimaryKeyStats{
FieldID: pkField.FieldID,
PkType: int64(pkField.DataType),
BF: s.currentStat.PkFilter,
MaxPk: s.currentStat.MaxPK,
MinPk: s.currentStat.MinPK,
})
}
return statsList, s.numRows
}
func (s *Segment) InitCurrentStat() {
if s.currentStat == nil {
s.currentStat = &storage.PkStatistics{

View File

@ -840,7 +840,9 @@ func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp,
ID: colID,
Schema: schema,
}
binLogs, statsBinLogs, err := storage.NewInsertCodecWithSchema(meta).Serialize(partID, segmentID, data.buffer)
iCodec := storage.NewInsertCodecWithSchema(meta)
binLogs, err := iCodec.Serialize(partID, segmentID, data.buffer)
if err != nil {
return nil, nil, err
}
@ -878,27 +880,30 @@ func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp,
field2Stats := make(map[UniqueID]*datapb.Binlog)
// write stats binlog
for _, blob := range statsBinLogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
return nil, nil, err
}
statsBinLog, err := iCodec.SerializePkStatsByData(data.buffer)
if err != nil {
return nil, nil, err
}
logidx := field2Logidx[fieldID]
fieldID, err := strconv.ParseInt(statsBinLog.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
return nil, nil, err
}
// no error raise if alloc=false
k := metautil.JoinIDPath(colID, partID, segmentID, fieldID, logidx)
logidx := field2Logidx[fieldID]
key := path.Join(node.chunkManager.RootPath(), common.SegmentStatslogPath, k)
kvs[key] = blob.Value
field2Stats[fieldID] = &datapb.Binlog{
EntriesNum: data.size,
TimestampFrom: ts,
TimestampTo: ts,
LogPath: key,
LogSize: int64(len(blob.Value)),
}
// no error raise if alloc=false
k := metautil.JoinIDPath(colID, partID, segmentID, fieldID, logidx)
key := path.Join(node.chunkManager.RootPath(), common.SegmentStatslogPath, k)
kvs[key] = statsBinLog.Value
field2Stats[fieldID] = &datapb.Binlog{
EntriesNum: data.size,
TimestampFrom: ts,
TimestampTo: ts,
LogPath: key,
LogSize: int64(len(statsBinLog.Value)),
}
err = node.chunkManager.MultiWrite(ctx, kvs)

View File

@ -56,3 +56,10 @@ func startTracer(msg msgstream.TsMsg, name string) (context.Context, trace.Span)
}
return otel.Tracer(typeutil.DataNodeRole).Start(ctx, name)
}
func boolToInt(value bool) int {
if value {
return 1
}
return 0
}

View File

@ -202,7 +202,7 @@ func (c *mockChunkmgr) mockFieldData(numrows, dim int, collectionID, partitionID
insertCodec := &storage.InsertCodec{
Schema: collMeta,
}
blobs, _, err := insertCodec.Serialize(partitionID, segmentID, insertData)
blobs, err := insertCodec.Serialize(partitionID, segmentID, insertData)
if err != nil {
panic(err)
}

View File

@ -826,6 +826,16 @@ func checkBinlogs(binlogType storage.BinlogType, segmentID typeutil.UniqueID, lo
}
}
func hasSepcialStatslog(logs *datapb.FieldBinlog) bool {
for _, statslog := range logs.Binlogs {
_, logidx := path.Split(statslog.LogPath)
if logidx == storage.CompoundStatsType.LogIdx() {
return true
}
}
return false
}
func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.UniqueID,
binlogs, deltalogs, statslogs []*datapb.FieldBinlog) (map[string]string, error) {
@ -833,8 +843,9 @@ func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.Uniqu
checkBinlogs(storage.DeleteBinlog, segmentID, deltalogs)
checkBinlogs(storage.StatsBinlog, segmentID, statslogs)
// check stats log and bin log size match
if len(binlogs) != 0 && len(statslogs) != 0 {
if len(statslogs[0].GetBinlogs()) != len(binlogs[0].GetBinlogs()) {
// num of stats log may one more than num of binlogs if segment flushed and merged stats log
if len(binlogs) != 0 && len(statslogs) != 0 && !hasSepcialStatslog(statslogs[0]) {
if len(binlogs[0].GetBinlogs()) != len(statslogs[0].GetBinlogs()) {
log.Warn("find invalid segment while bin log size didn't match stat log size",
zap.Int64("collection", collectionID),
zap.Int64("partition", partitionID),
@ -843,7 +854,7 @@ func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.Uniqu
zap.Any("stats", statslogs),
zap.Any("delta", deltalogs),
)
return nil, fmt.Errorf("Segment can not be saved because of binlog "+
return nil, fmt.Errorf("segment can not be saved because of binlog "+
"file not match stat log number: collection %v, segment %v", collectionID, segmentID)
}
}

View File

@ -681,9 +681,18 @@ func genStorageBlob(collectionID int64,
if err != nil {
return nil, nil, err
}
binLogs, statsLogs, err := inCodec.Serialize(partitionID, segmentID, insertData)
return binLogs, statsLogs, err
binLogs, err := inCodec.Serialize(partitionID, segmentID, insertData)
if err != nil {
return nil, nil, err
}
statsLog, err := inCodec.SerializePkStatsByData(insertData)
if err != nil {
return nil, nil, err
}
return binLogs, []*storage.Blob{statsLog}, nil
}
func genCollectionMeta(collectionID int64, partitionID int64, schema *schemapb.CollectionSchema) *etcdpb.CollectionMeta {

View File

@ -359,8 +359,8 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI
bfs := pkoracle.NewBloomFilterSet(segmentID, partitionID, commonpb.SegmentState_Sealed)
log.Info("loading bloom filter for remote...")
pkStatsBinlogs := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID())
err := loader.loadBloomFilter(ctx, segmentID, bfs, pkStatsBinlogs)
pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID())
err := loader.loadBloomFilter(ctx, segmentID, bfs, pkStatsBinlogs, logType)
if err != nil {
log.Warn("load remote segment bloom filter failed",
zap.Int64("partitionID", partitionID),
@ -459,8 +459,8 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
// load statslog if it's growing segment
if segment.typ == SegmentTypeGrowing {
log.Info("loading statslog...")
pkStatsBinlogs := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID())
err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, pkStatsBinlogs)
pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID())
err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, pkStatsBinlogs, logType)
if err != nil {
return err
}
@ -470,16 +470,24 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
return loader.LoadDeltaLogs(ctx, segment, loadInfo.Deltalogs)
}
func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) []string {
func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) ([]string, storage.StatsLogType) {
result := make([]string, 0)
for _, fieldBinlog := range fieldBinlogs {
if fieldBinlog.FieldID == pkFieldID {
for _, binlog := range fieldBinlog.GetBinlogs() {
result = append(result, binlog.GetLogPath())
_, logidx := path.Split(binlog.GetLogPath())
// if special status log exist
// only load one file
switch logidx {
case storage.CompoundStatsType.LogIdx():
return []string{binlog.GetLogPath()}, storage.CompoundStatsType
default:
result = append(result, binlog.GetLogPath())
}
}
}
}
return result
return result, storage.DefaultStatsType
}
func (loader *segmentLoader) loadGrowingSegmentFields(ctx context.Context, segment *LocalSegment, fieldBinlogs []*datapb.FieldBinlog) error {
@ -784,7 +792,9 @@ func (loader *segmentLoader) loadSealedSegments(segment *LocalSegment, insertDat
return nil
}
func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet, binlogPaths []string) error {
func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet,
binlogPaths []string, logType storage.StatsLogType) error {
log := log.Ctx(ctx).With(
zap.Int64("segmentID", segmentID),
)
@ -798,16 +808,26 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6
if err != nil {
return err
}
blobs := make([]*storage.Blob, 0)
blobs := []*storage.Blob{}
for i := 0; i < len(values); i++ {
blobs = append(blobs, &storage.Blob{Value: values[i]})
}
stats, err := storage.DeserializeStats(blobs)
if err != nil {
log.Warn("failed to deserialize stats", zap.Error(err))
return err
var stats []*storage.PrimaryKeyStats
if logType == storage.CompoundStatsType {
stats, err = storage.DeserializeStatsList(blobs[0])
if err != nil {
log.Warn("failed to deserialize stats list", zap.Error(err))
return err
}
} else {
stats, err = storage.DeserializeStats(blobs)
if err != nil {
log.Warn("failed to deserialize stats", zap.Error(err))
return err
}
}
var size uint
for _, stat := range stats {
pkStat := &storage.PkStatistics{

View File

@ -72,7 +72,7 @@ func generateTestData(t *testing.T, num int) []*Blob {
},
}}
blobs, _, err := insertCodec.Serialize(1, 1, data)
blobs, err := insertCodec.Serialize(1, 1, data)
assert.Nil(t, err)
return blobs
}

View File

@ -287,6 +287,15 @@ type InsertData struct {
Infos []BlobInfo
}
func (iData *InsertData) IsEmpty() bool {
if iData == nil {
return true
}
timeFieldData, ok := iData.Data[common.TimeStampField]
return (!ok) || (timeFieldData.RowNum() <= 0)
}
// InsertCodec serializes and deserializes the insert data
// Blob key example:
// ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
@ -304,20 +313,95 @@ func NewInsertCodecWithSchema(schema *etcdpb.CollectionMeta) *InsertCodec {
return &InsertCodec{Schema: schema}
}
// Serialize Pk stats log
func (insertCodec *InsertCodec) SerializePkStats(stats *PrimaryKeyStats, rowNum int64) (*Blob, error) {
if stats == nil || stats.BF == nil {
return nil, fmt.Errorf("sericalize empty pk stats")
}
//Serialize by pk stats
blobKey := fmt.Sprintf("%d", stats.FieldID)
statsWriter := &StatsWriter{}
err := statsWriter.Generate(stats)
if err != nil {
return nil, err
}
buffer := statsWriter.GetBuffer()
return &Blob{
Key: blobKey,
Value: buffer,
RowNum: rowNum,
}, nil
}
// Serialize Pk stats list to one blob
func (insertCodec *InsertCodec) SerializePkStatsList(stats []*PrimaryKeyStats, rowNum int64) (*Blob, error) {
if len(stats) == 0 {
return nil, nil
}
blobKey := fmt.Sprintf("%d", stats[0].FieldID)
statsWriter := &StatsWriter{}
err := statsWriter.GenerateList(stats)
if err != nil {
return nil, err
}
buffer := statsWriter.GetBuffer()
return &Blob{
Key: blobKey,
Value: buffer,
RowNum: rowNum,
}, nil
}
// Serialize Pk stats log by insert data
func (insertCodec *InsertCodec) SerializePkStatsByData(data *InsertData) (*Blob, error) {
timeFieldData, ok := data.Data[common.TimeStampField]
if !ok {
return nil, fmt.Errorf("data doesn't contains timestamp field")
}
if timeFieldData.RowNum() <= 0 {
return nil, fmt.Errorf("there's no data in InsertData")
}
rowNum := int64(timeFieldData.RowNum())
for _, field := range insertCodec.Schema.Schema.Fields {
// stats fields
if !field.GetIsPrimaryKey() {
continue
}
singleData := data.Data[field.FieldID]
blobKey := fmt.Sprintf("%d", field.FieldID)
statsWriter := &StatsWriter{}
err := statsWriter.GenerateByData(field.FieldID, field.DataType, singleData)
if err != nil {
return nil, err
}
buffer := statsWriter.GetBuffer()
return &Blob{
Key: blobKey,
Value: buffer,
RowNum: rowNum,
}, nil
}
return nil, fmt.Errorf("there is no pk field")
}
// Serialize transfer insert data to blob. It will sort insert data by timestamp.
// From schema, it gets all fields.
// For each field, it will create a binlog writer, and write an event to the binlog.
// It returns binlog buffer in the end.
func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, []*Blob, error) {
func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) {
blobs := make([]*Blob, 0)
statsBlobs := make([]*Blob, 0)
var writer *InsertBinlogWriter
timeFieldData, ok := data.Data[common.TimeStampField]
if !ok {
return nil, nil, fmt.Errorf("data doesn't contains timestamp field")
return nil, fmt.Errorf("data doesn't contains timestamp field")
}
if timeFieldData.RowNum() <= 0 {
return nil, nil, fmt.Errorf("there's no data in InsertData")
return nil, fmt.Errorf("there's no data in InsertData")
}
rowNum := int64(timeFieldData.RowNum())
@ -346,14 +430,14 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
case schemapb.DataType_BinaryVector:
eventWriter, err = writer.NextInsertEventWriter(singleData.(*BinaryVectorFieldData).Dim)
default:
return nil, nil, fmt.Errorf("undefined data type %d", field.DataType)
return nil, fmt.Errorf("undefined data type %d", field.DataType)
}
} else {
eventWriter, err = writer.NextInsertEventWriter()
}
if err != nil {
writer.Close()
return nil, nil, err
return nil, err
}
eventWriter.SetEventTimestamp(typeutil.Timestamp(startTs), typeutil.Timestamp(endTs))
@ -363,7 +447,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BoolFieldData).GetMemorySize()))
case schemapb.DataType_Int8:
@ -371,7 +455,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int8FieldData).GetMemorySize()))
case schemapb.DataType_Int16:
@ -379,7 +463,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int16FieldData).GetMemorySize()))
case schemapb.DataType_Int32:
@ -387,7 +471,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int32FieldData).GetMemorySize()))
case schemapb.DataType_Int64:
@ -395,7 +479,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int64FieldData).GetMemorySize()))
case schemapb.DataType_Float:
@ -403,7 +487,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatFieldData).GetMemorySize()))
case schemapb.DataType_Double:
@ -411,7 +495,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*DoubleFieldData).GetMemorySize()))
case schemapb.DataType_String, schemapb.DataType_VarChar:
@ -420,7 +504,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*StringFieldData).GetMemorySize()))
@ -430,7 +514,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*ArrayFieldData).GetMemorySize()))
@ -440,7 +524,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*JSONFieldData).GetMemorySize()))
@ -449,7 +533,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BinaryVectorFieldData).GetMemorySize()))
case schemapb.DataType_FloatVector:
@ -457,14 +541,14 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatVectorFieldData).GetMemorySize()))
default:
return nil, nil, fmt.Errorf("undefined data type %d", field.DataType)
return nil, fmt.Errorf("undefined data type %d", field.DataType)
}
if err != nil {
return nil, nil, err
return nil, err
}
writer.SetEventTimeStamp(typeutil.Timestamp(startTs), typeutil.Timestamp(endTs))
@ -472,14 +556,14 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
return nil, err
}
blobKey := fmt.Sprintf("%d", field.FieldID)
blobs = append(blobs, &Blob{
@ -489,24 +573,9 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
})
eventWriter.Close()
writer.Close()
// stats fields
if field.GetIsPrimaryKey() {
statsWriter := &StatsWriter{}
err = statsWriter.GeneratePrimaryKeyStats(field.FieldID, field.DataType, singleData)
if err != nil {
return nil, nil, err
}
statsBuffer := statsWriter.GetBuffer()
statsBlobs = append(statsBlobs, &Blob{
Key: blobKey,
Value: statsBuffer,
RowNum: rowNum,
})
}
}
return blobs, statsBlobs, nil
return blobs, nil
}
func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) (

View File

@ -105,7 +105,7 @@ func TestInsertCodec(t *testing.T) {
{
FieldID: Int64Field,
Name: "field_int64",
IsPrimaryKey: false,
IsPrimaryKey: true,
Description: "int64",
DataType: schemapb.DataType_Int64,
},
@ -307,18 +307,21 @@ func TestInsertCodec(t *testing.T) {
JSONField: &JSONFieldData{[][]byte{}},
},
}
b, s, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty)
b, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty)
assert.Error(t, err)
assert.Empty(t, b)
s, err := insertCodec.SerializePkStatsByData(insertDataEmpty)
assert.Error(t, err)
assert.Empty(t, s)
Blobs1, statsBlob1, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1)
Blobs1, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1)
assert.Nil(t, err)
for _, blob := range Blobs1 {
blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 100)
assert.Equal(t, blob.GetKey(), blob.Key)
}
Blobs2, statsBlob2, err := insertCodec.Serialize(PartitionID, SegmentID, insertData2)
Blobs2, err := insertCodec.Serialize(PartitionID, SegmentID, insertData2)
assert.Nil(t, err)
for _, blob := range Blobs2 {
blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99)
@ -368,10 +371,14 @@ func TestInsertCodec(t *testing.T) {
_, _, _, _, err = insertCodec.DeserializeAll(blobs)
assert.NotNil(t, err)
_, err = DeserializeStats(statsBlob1)
statsBlob1, err := insertCodec.SerializePkStatsByData(insertData1)
assert.Nil(t, err)
_, err = DeserializeStats([]*Blob{statsBlob1})
assert.Nil(t, err)
_, err = DeserializeStats(statsBlob2)
statsBlob2, err := insertCodec.SerializePkStatsByData(insertData2)
assert.Nil(t, err)
_, err = DeserializeStats([]*Blob{statsBlob2})
assert.Nil(t, err)
}
@ -495,7 +502,7 @@ func TestDDCodec(t *testing.T) {
func TestTsError(t *testing.T) {
insertData := &InsertData{}
insertCodec := NewInsertCodecWithSchema(nil)
blobs, _, err := insertCodec.Serialize(1, 1, insertData)
blobs, err := insertCodec.Serialize(1, 1, insertData)
assert.Nil(t, blobs)
assert.NotNil(t, err)
}

View File

@ -268,7 +268,7 @@ func TestPrintBinlogFiles(t *testing.T) {
},
},
}
firstBlobs, _, err := insertCodec.Serialize(1, 1, insertDataFirst)
firstBlobs, err := insertCodec.Serialize(1, 1, insertDataFirst)
assert.Nil(t, err)
var binlogFiles []string
for index, blob := range firstBlobs {
@ -283,7 +283,7 @@ func TestPrintBinlogFiles(t *testing.T) {
err = fd.Close()
assert.Nil(t, err)
}
secondBlobs, _, err := insertCodec.Serialize(1, 1, insertDataSecond)
secondBlobs, err := insertCodec.Serialize(1, 1, insertDataSecond)
assert.Nil(t, err)
for index, blob := range secondBlobs {
blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99)

View File

@ -18,10 +18,12 @@ package storage
import (
"encoding/json"
"fmt"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
)
const (
@ -93,6 +95,8 @@ func (stats *PrimaryKeyStats) UnmarshalJSON(data []byte) error {
case schemapb.DataType_VarChar:
stats.MaxPk = &VarCharPrimaryKey{}
stats.MinPk = &VarCharPrimaryKey{}
default:
return fmt.Errorf("Invalid PK Data Type")
}
if maxPkMessage, ok := messageMap["maxPk"]; ok && maxPkMessage != nil {
@ -120,8 +124,58 @@ func (stats *PrimaryKeyStats) UnmarshalJSON(data []byte) error {
return nil
}
func (stats *PrimaryKeyStats) UpdateByMsgs(msgs FieldData) {
switch schemapb.DataType(stats.PkType) {
case schemapb.DataType_Int64:
data := msgs.(*Int64FieldData).Data
if len(data) < 1 {
// return error: msgs must has one element at least
return
}
b := make([]byte, 8)
for _, int64Value := range data {
pk := NewInt64PrimaryKey(int64Value)
stats.UpdateMinMax(pk)
common.Endian.PutUint64(b, uint64(int64Value))
stats.BF.Add(b)
}
case schemapb.DataType_VarChar:
data := msgs.(*StringFieldData).Data
if len(data) < 1 {
// return error: msgs must has one element at least
return
}
for _, str := range data {
pk := NewVarCharPrimaryKey(str)
stats.UpdateMinMax(pk)
stats.BF.AddString(str)
}
default:
//TODO::
}
}
func (stats *PrimaryKeyStats) Update(pk PrimaryKey) {
stats.UpdateMinMax(pk)
switch schemapb.DataType(stats.PkType) {
case schemapb.DataType_Int64:
data := pk.GetValue().(int64)
b := make([]byte, 8)
common.Endian.PutUint64(b, uint64(data))
stats.BF.Add(b)
case schemapb.DataType_VarChar:
data := pk.GetValue().(string)
stats.BF.AddString(data)
default:
log.Warn("Update pk stats with invalid data type")
}
}
// updatePk update minPk and maxPk value
func (stats *PrimaryKeyStats) updatePk(pk PrimaryKey) {
func (stats *PrimaryKeyStats) UpdateMinMax(pk PrimaryKey) {
if stats.MinPk == nil {
stats.MinPk = pk
} else if stats.MinPk.GT(pk) {
@ -135,6 +189,14 @@ func (stats *PrimaryKeyStats) updatePk(pk PrimaryKey) {
}
}
func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) *PrimaryKeyStats {
return &PrimaryKeyStats{
FieldID: fieldID,
PkType: pkType,
BF: bloom.NewWithEstimates(uint(rowNum), MaxBloomFalsePositive),
}
}
// StatsWriter writes stats to buffer
type StatsWriter struct {
buffer []byte
@ -145,54 +207,38 @@ func (sw *StatsWriter) GetBuffer() []byte {
return sw.buffer
}
// GeneratePrimaryKeyStats writes Int64Stats from @msgs with @fieldID to @buffer
func (sw *StatsWriter) GeneratePrimaryKeyStats(fieldID int64, pkType schemapb.DataType, msgs FieldData) error {
stats := &PrimaryKeyStats{
FieldID: fieldID,
PkType: int64(pkType),
}
stats.BF = bloom.NewWithEstimates(uint(msgs.RowNum()), MaxBloomFalsePositive)
switch pkType {
case schemapb.DataType_Int64:
data := msgs.(*Int64FieldData).Data
if len(data) < 1 {
// return error: msgs must has one element at least
return nil
}
b := make([]byte, 8)
for _, int64Value := range data {
pk := NewInt64PrimaryKey(int64Value)
stats.updatePk(pk)
common.Endian.PutUint64(b, uint64(int64Value))
stats.BF.Add(b)
}
case schemapb.DataType_VarChar:
data := msgs.(*StringFieldData).Data
if len(data) < 1 {
// return error: msgs must has one element at least
return nil
}
for _, str := range data {
pk := NewVarCharPrimaryKey(str)
stats.updatePk(pk)
stats.BF.AddString(str)
}
default:
//TODO::
}
// GenerateList writes Stats slice to buffer
func (sw *StatsWriter) GenerateList(stats []*PrimaryKeyStats) error {
b, err := json.Marshal(stats)
if err != nil {
return err
}
sw.buffer = b
return nil
}
// Generate writes Stats to buffer
func (sw *StatsWriter) Generate(stats *PrimaryKeyStats) error {
b, err := json.Marshal(stats)
if err != nil {
return err
}
sw.buffer = b
return nil
}
// GenerateByData writes Int64Stats or StringStats from @msgs with @fieldID to @buffer
func (sw *StatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs FieldData) error {
stats := &PrimaryKeyStats{
FieldID: fieldID,
PkType: int64(pkType),
BF: bloom.NewWithEstimates(uint(msgs.RowNum()), MaxBloomFalsePositive),
}
stats.UpdateByMsgs(msgs)
return sw.Generate(stats)
}
// StatsReader reads stats
type StatsReader struct {
buffer []byte
@ -214,6 +260,17 @@ func (sr *StatsReader) GetPrimaryKeyStats() (*PrimaryKeyStats, error) {
return stats, nil
}
// GetInt64Stats returns buffer as PrimaryKeyStats
func (sr *StatsReader) GetPrimaryKeyStatsList() ([]*PrimaryKeyStats, error) {
stats := []*PrimaryKeyStats{}
err := json.Unmarshal(sr.buffer, &stats)
if err != nil {
return nil, err
}
return stats, nil
}
// DeserializeStats deserialize @blobs as []*PrimaryKeyStats
func DeserializeStats(blobs []*Blob) ([]*PrimaryKeyStats, error) {
results := make([]*PrimaryKeyStats, 0, len(blobs))
@ -231,3 +288,16 @@ func DeserializeStats(blobs []*Blob) ([]*PrimaryKeyStats, error) {
}
return results, nil
}
func DeserializeStatsList(blob *Blob) ([]*PrimaryKeyStats, error) {
if blob.Value == nil {
return []*PrimaryKeyStats{}, nil
}
sr := &StatsReader{}
sr.SetBuffer(blob.Value)
stats, err := sr.GetPrimaryKeyStatsList()
if err != nil {
return nil, err
}
return stats, nil
}

View File

@ -32,7 +32,7 @@ func TestStatsWriter_Int64PrimaryKey(t *testing.T) {
Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9},
}
sw := &StatsWriter{}
err := sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, data)
err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data)
assert.NoError(t, err)
b := sw.GetBuffer()
@ -57,7 +57,7 @@ func TestStatsWriter_Int64PrimaryKey(t *testing.T) {
msgs := &Int64FieldData{
Data: []int64{},
}
err = sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, msgs)
err = sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, msgs)
assert.Nil(t, err)
}
@ -71,7 +71,7 @@ func TestStatsWriter_BF(t *testing.T) {
}
t.Log(data.RowNum())
sw := &StatsWriter{}
err := sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, data)
err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data)
assert.NoError(t, err)
stats := &PrimaryKeyStats{}
@ -95,7 +95,7 @@ func TestStatsWriter_VarCharPrimaryKey(t *testing.T) {
Data: []string{"bc", "ac", "abd", "cd", "milvus"},
}
sw := &StatsWriter{}
err := sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_VarChar, data)
err := sw.GenerateByData(common.RowIDField, schemapb.DataType_VarChar, data)
assert.NoError(t, err)
b := sw.GetBuffer()
@ -114,7 +114,7 @@ func TestStatsWriter_VarCharPrimaryKey(t *testing.T) {
msgs := &Int64FieldData{
Data: []int64{},
}
err = sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, msgs)
err = sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, msgs)
assert.Nil(t, err)
}

View File

@ -13,12 +13,27 @@ package storage
import (
"context"
"fmt"
"io"
"time"
"golang.org/x/exp/mmap"
)
type StatsLogType int64
const (
DefaultStatsType StatsLogType = iota + 0
// CompundStatsType log save multiple stats
// and bloom filters to one file
CompoundStatsType
)
func (s StatsLogType) LogIdx() string {
return fmt.Sprintf("0%d", s)
}
type FileReader interface {
io.Reader
io.Closer

View File

@ -108,7 +108,7 @@ func initBinlogFile(schema *etcdpb.CollectionMeta) []*Blob {
},
}
blobs, _, err := insertCodec.Serialize(1, 1, insertData)
blobs, err := insertCodec.Serialize(1, 1, insertData)
if err != nil {
return nil
}