diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index 6284b389e5..af4a2b6928 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -22,6 +22,7 @@ import ( "errors" "path" "strconv" + "time" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" @@ -31,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/etcdpb" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var ( @@ -61,38 +63,38 @@ var _ downloader = (*binlogIO)(nil) var _ uploader = (*binlogIO)(nil) func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error) { - var err error = errStart + var ( + err = errStart + vs = []string{} + ) - r := make(chan []string) - go func(r chan<- []string) { - var vs []string + g, gCtx := errgroup.WithContext(ctx) + g.Go(func() error { for err != nil { select { - case <-ctx.Done(): - close(r) + case <-gCtx.Done(): log.Warn("ctx done when downloading kvs from blob storage") - return + return errDownloadFromBlobStorage default: if err != errStart { + <-time.After(50 * time.Millisecond) log.Warn("Try multiloading again", zap.Strings("paths", paths)) } vs, err = b.MultiLoad(paths) } } - r <- vs - }(r) + return nil + }) - vs, ok := <-r - if !ok { - return nil, errDownloadFromBlobStorage + if err := g.Wait(); err != nil { + return nil, err } - rst := make([]*Blob, 0, len(vs)) - for _, vstr := range vs { - b := bytes.NewBufferString(vstr) - rst = append(rst, &Blob{Value: b.Bytes()}) + rst := make([]*Blob, len(vs)) + for i := range rst { + rst[i] = &Blob{Value: bytes.NewBufferString(vs[i]).Bytes()} } return rst, nil @@ -148,27 +150,27 @@ func (b *binlogIO) upload( p.deltaInfo.DeltaLogPath = k } - success := make(chan struct{}) - go func(success chan<- struct{}) { - err := errStart + var err = errStart + g, gCtx := errgroup.WithContext(ctx) + g.Go(func() error { for err != nil { select { - case <-ctx.Done(): - close(success) + case <-gCtx.Done(): log.Warn("ctx done when saving kvs to blob storage") - return + return errUploadToBlobStorage default: if err != errStart { + <-time.After(50 * time.Millisecond) log.Info("retry save binlogs") } err = b.MultiSave(kvs) } } - success <- struct{}{} - }(success) + return nil + }) - if _, ok := <-success; !ok { - return nil, errUploadToBlobStorage + if err := g.Wait(); err != nil { + return nil, err } return p, nil @@ -214,11 +216,8 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta } for _, blob := range inlogs { - fID, err := strconv.ParseInt(blob.GetKey(), 10, 64) - if err != nil { - log.Error("can not parse string to fieldID", zap.Error(err)) - return nil, nil, nil, err - } + // Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt + fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64) k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator) key := path.Join(Params.InsertBinlogRootPath, k) @@ -230,11 +229,8 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta } for _, blob := range statslogs { - fID, err := strconv.ParseInt(blob.GetKey(), 10, 64) - if err != nil { - log.Error("can not parse string to fieldID", zap.Error(err)) - return nil, nil, nil, err - } + // Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt + fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64) k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator) key := path.Join(Params.StatsBinlogRootPath, k) diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index 24850120ce..6c2d3f16f7 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -18,8 +18,10 @@ package datanode import ( "context" + "errors" "path" "testing" + "time" "github.com/milvus-io/milvus/internal/kv" memkv "github.com/milvus-io/milvus/internal/kv/mem" @@ -59,6 +61,44 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { assert.Nil(t, p) }) + t.Run("Test upload error", func(t *testing.T) { + f := &MetaFactory{} + meta := f.GetCollectionMeta(UniqueID(10001), "uploads") + dData := &DeleteData{ + Pks: []int64{}, + Tss: []uint64{}, + } + + iData := genEmptyInsertData() + p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) + assert.Error(t, err) + assert.Empty(t, p) + + iData = genInsertData() + dData = &DeleteData{ + Pks: []int64{}, + Tss: []uint64{1}, + RowCount: 1, + } + p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) + assert.Error(t, err) + assert.Empty(t, p) + + mkv := &mockKv{errMultiSave: true} + bin := &binlogIO{mkv, alloc} + iData = genInsertData() + dData = &DeleteData{ + Pks: []int64{1}, + Tss: []uint64{1}, + RowCount: 1, + } + ctx, cancel := context.WithTimeout(context.TODO(), 20*time.Millisecond) + p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) + assert.Error(t, err) + assert.Empty(t, p) + cancel() + }) + t.Run("Test download", func(t *testing.T) { tests := []struct { isvalid bool @@ -99,7 +139,17 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { } }) } + }) + t.Run("Test download twice", func(t *testing.T) { + mkv := &mockKv{errMultiLoad: true} + b := &binlogIO{mkv, alloc} + + ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond*20) + blobs, err := b.download(ctx, []string{"a"}) + assert.Error(t, err) + assert.Empty(t, blobs) + cancel() }) } @@ -152,7 +202,22 @@ func TestBinlogIOInnerMethods(t *testing.T) { } }) } + }) + t.Run("Test genDeltaBlobs error", func(t *testing.T) { + k, v, err := b.genDeltaBlobs(&DeleteData{Pks: []int64{1}, Tss: []uint64{}}, 1, 1, 1) + assert.Error(t, err) + assert.Empty(t, k) + assert.Empty(t, v) + + errAlloc := NewAllocatorFactory() + errAlloc.isvalid = false + + bin := binlogIO{memkv.NewMemoryKV(), errAlloc} + k, v, err = bin.genDeltaBlobs(&DeleteData{Pks: []int64{1}, Tss: []uint64{1}}, 1, 1, 1) + assert.Error(t, err) + assert.Empty(t, k) + assert.Empty(t, v) }) t.Run("Test genInsertBlobs", func(t *testing.T) { @@ -172,6 +237,33 @@ func TestBinlogIOInnerMethods(t *testing.T) { zap.String("stats paths field0", pstats[0].GetBinlogs()[0])) }) + t.Run("Test genInsertBlobs error", func(t *testing.T) { + kvs, pin, pstats, err := b.genInsertBlobs(&InsertData{}, 1, 1, nil) + assert.Error(t, err) + assert.Empty(t, kvs) + assert.Empty(t, pin) + assert.Empty(t, pstats) + + f := &MetaFactory{} + meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs") + + kvs, pin, pstats, err = b.genInsertBlobs(genEmptyInsertData(), 10, 1, meta) + assert.Error(t, err) + assert.Empty(t, kvs) + assert.Empty(t, pin) + assert.Empty(t, pstats) + + errAlloc := NewAllocatorFactory() + errAlloc.errAllocBatch = true + bin := &binlogIO{memkv.NewMemoryKV(), errAlloc} + kvs, pin, pstats, err = bin.genInsertBlobs(genInsertData(), 10, 1, meta) + + assert.Error(t, err) + assert.Empty(t, kvs) + assert.Empty(t, pin) + assert.Empty(t, pstats) + }) + t.Run("Test idxGenerator", func(t *testing.T) { tests := []struct { isvalid bool @@ -224,3 +316,31 @@ func TestBinlogIOInnerMethods(t *testing.T) { }) } + +type mockKv struct { + errMultiLoad bool + errMultiSave bool +} + +var _ kv.BaseKV = (*mockKv)(nil) + +func (mk *mockKv) Load(key string) (string, error) { return "", nil } +func (mk *mockKv) MultiLoad(keys []string) ([]string, error) { + if mk.errMultiLoad { + return []string{}, errors.New("mockKv multiload error") + } + return []string{"a"}, nil +} + +func (mk *mockKv) LoadWithPrefix(key string) ([]string, []string, error) { return nil, nil, nil } +func (mk *mockKv) Save(key, value string) error { return nil } +func (mk *mockKv) MultiSave(kvs map[string]string) error { + if mk.errMultiSave { + return errors.New("mockKv multisave error") + } + return nil +} +func (mk *mockKv) Remove(key string) error { return nil } +func (mk *mockKv) MultiRemove(keys []string) error { return nil } +func (mk *mockKv) RemoveWithPrefix(key string) error { return nil } +func (mk *mockKv) Close() {} diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index c9c7223de6..9eaf44c2c2 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -533,9 +533,10 @@ func genFlowGraphDeleteMsg(pks []int64, chanName string) flowGraphMsg { type AllocatorFactory struct { sync.Mutex - r *rand.Rand - isvalid bool - random bool + r *rand.Rand + isvalid bool + random bool + errAllocBatch bool } var _ allocatorInterface = &AllocatorFactory{} @@ -564,7 +565,7 @@ func (alloc *AllocatorFactory) allocID() (UniqueID, error) { } func (alloc *AllocatorFactory) allocIDBatch(count uint32) (UniqueID, uint32, error) { - if count == 0 { + if count == 0 || alloc.errAllocBatch { return 0, 0, errors.New("count should be greater than zero") } @@ -749,3 +750,55 @@ func genInsertData() *InsertData { }, }} } + +func genEmptyInsertData() *InsertData { + return &InsertData{ + Data: map[int64]s.FieldData{ + 0: &s.Int64FieldData{ + NumRows: []int64{0}, + Data: []int64{}, + }, + 1: &s.Int64FieldData{ + NumRows: []int64{0}, + Data: []int64{}, + }, + 100: &s.FloatVectorFieldData{ + NumRows: []int64{0}, + Data: []float32{}, + Dim: 2, + }, + 101: &s.BinaryVectorFieldData{ + NumRows: []int64{0}, + Data: []byte{}, + Dim: 32, + }, + 102: &s.BoolFieldData{ + NumRows: []int64{0}, + Data: []bool{}, + }, + 103: &s.Int8FieldData{ + NumRows: []int64{0}, + Data: []int8{}, + }, + 104: &s.Int16FieldData{ + NumRows: []int64{0}, + Data: []int16{}, + }, + 105: &s.Int32FieldData{ + NumRows: []int64{0}, + Data: []int32{}, + }, + 106: &s.Int64FieldData{ + NumRows: []int64{0}, + Data: []int64{}, + }, + 107: &s.FloatFieldData{ + NumRows: []int64{0}, + Data: []float32{}, + }, + 108: &s.DoubleFieldData{ + NumRows: []int64{0}, + Data: []float64{}, + }, + }} +} diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 5d2b119b35..771261cb82 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -263,6 +263,10 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if !ok { return nil, nil, fmt.Errorf("data doesn't contains timestamp field") } + if timeFieldData.RowNum() <= 0 { + return nil, nil, fmt.Errorf("There's no data in InsertData") + } + ts := timeFieldData.(*Int64FieldData).Data startTs := ts[0] endTs := ts[len(ts)-1] diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 52523e0d2e..0c12fee7b5 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -251,6 +251,28 @@ func TestInsertCodec(t *testing.T) { }, }, } + + insertDataEmpty := &InsertData{ + Data: map[int64]FieldData{ + RowIDField: &Int64FieldData{[]int64{}, []int64{}}, + TimestampField: &Int64FieldData{[]int64{}, []int64{}}, + BoolField: &BoolFieldData{[]int64{}, []bool{}}, + Int8Field: &Int8FieldData{[]int64{}, []int8{}}, + Int16Field: &Int16FieldData{[]int64{}, []int16{}}, + Int32Field: &Int32FieldData{[]int64{}, []int32{}}, + Int64Field: &Int64FieldData{[]int64{}, []int64{}}, + FloatField: &FloatFieldData{[]int64{}, []float32{}}, + DoubleField: &DoubleFieldData{[]int64{}, []float64{}}, + StringField: &StringFieldData{[]int64{}, []string{}}, + BinaryVectorField: &BinaryVectorFieldData{[]int64{}, []byte{}, 8}, + FloatVectorField: &FloatVectorFieldData{[]int64{}, []float32{}, 4}, + }, + } + b, s, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty) + assert.Error(t, err) + assert.Empty(t, b) + assert.Empty(t, s) + Blobs1, statsBlob1, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1) assert.Nil(t, err) for _, blob := range Blobs1 {