From 0769e00e707e6d15a823c69859a33d3e653d3bf0 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 5 Dec 2022 15:31:19 +0800 Subject: [PATCH] Fix BufferData limit calculation for BinaryVector (#20963) (#20971) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/datanode/buffer.go | 15 ++++++++++----- internal/datanode/buffer_test.go | 18 ++++++++++-------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/internal/datanode/buffer.go b/internal/datanode/buffer.go index 7550e5bbd4..7679c83f17 100644 --- a/internal/datanode/buffer.go +++ b/internal/datanode/buffer.go @@ -332,13 +332,18 @@ func (ddb *DelDataBuf) updateStartAndEndPosition(startPos *internalpb.MsgPositio func newBufferData(collSchema *schemapb.CollectionSchema) (*BufferData, error) { // Get Dimension // TODO GOOSE: under assumption that there's only 1 Vector field in one collection schema - var dimension int - var err error + var vectorSize int for _, field := range collSchema.Fields { if field.DataType == schemapb.DataType_FloatVector || field.DataType == schemapb.DataType_BinaryVector { - dimension, err = storage.GetDimFromParams(field.TypeParams) + dimension, err := storage.GetDimFromParams(field.TypeParams) + switch field.DataType { + case schemapb.DataType_FloatVector: + vectorSize = dimension * 4 + case schemapb.DataType_BinaryVector: + vectorSize = dimension / 8 + } if err != nil { log.Error("failed to get dim from field", zap.Error(err)) return nil, err @@ -347,11 +352,11 @@ func newBufferData(collSchema *schemapb.CollectionSchema) (*BufferData, error) { } } - if dimension == 0 { + if vectorSize == 0 { return nil, errors.New("Invalid dimension") } - limit := Params.DataNodeCfg.FlushInsertBufferSize / (int64(dimension) * 4) + limit := Params.DataNodeCfg.FlushInsertBufferSize / int64(vectorSize) //TODO::xige-16 eval vec and string field return &BufferData{ diff --git a/internal/datanode/buffer_test.go b/internal/datanode/buffer_test.go index 102277f114..bed42f1f95 100644 --- a/internal/datanode/buffer_test.go +++ b/internal/datanode/buffer_test.go @@ -30,11 +30,11 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" ) -func genTestCollectionSchema(dim int64) *schemapb.CollectionSchema { +func genTestCollectionSchema(dim int64, vectorType schemapb.DataType) *schemapb.CollectionSchema { floatVecFieldSchema := &schemapb.FieldSchema{ FieldID: 100, Name: "vec", - DataType: schemapb.DataType_FloatVector, + DataType: vectorType, TypeParams: []*commonpb.KeyValuePair{ { Key: "dim", @@ -59,18 +59,20 @@ func TestBufferData(t *testing.T) { indim int64 expectedLimit int64 + vectorType schemapb.DataType description string }{ - {true, 1, 4194304, "Smallest of the DIM"}, - {true, 128, 32768, "Normal DIM"}, - {true, 32768, 128, "Largest DIM"}, - {false, 0, 0, "Illegal DIM"}, + {true, 1, 4194304, schemapb.DataType_FloatVector, "Smallest of the DIM"}, + {true, 128, 32768, schemapb.DataType_FloatVector, "Normal DIM"}, + {true, 32768, 128, schemapb.DataType_FloatVector, "Largest DIM"}, + {true, 4096, 32768, schemapb.DataType_BinaryVector, "Normal binary"}, + {false, 0, 0, schemapb.DataType_FloatVector, "Illegal DIM"}, } for _, test := range tests { t.Run(test.description, func(t *testing.T) { - idata, err := newBufferData(genTestCollectionSchema(test.indim)) + idata, err := newBufferData(genTestCollectionSchema(test.indim, test.vectorType)) if test.isValid { assert.NoError(t, err) @@ -128,7 +130,7 @@ func TestBufferData_updateTimeRange(t *testing.T) { for _, tc := range cases { t.Run(tc.tag, func(t *testing.T) { - bd, err := newBufferData(genTestCollectionSchema(16)) + bd, err := newBufferData(genTestCollectionSchema(16, schemapb.DataType_FloatVector)) require.NoError(t, err) for _, tr := range tc.trs { bd.updateTimeRange(tr)