diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go index e117ae14a2..8beaa9be14 100644 --- a/internal/storage/insert_data.go +++ b/internal/storage/insert_data.go @@ -115,12 +115,21 @@ func (i *InsertData) Append(row map[FieldID]interface{}) error { return nil } +func (i *InsertData) GetRow(idx int) map[FieldID]interface{} { + res := make(map[FieldID]interface{}) + for field, data := range i.Data { + res[field] = data.GetRow(idx) + } + return res +} + // FieldData defines field data interface type FieldData interface { GetMemorySize() int RowNum() int GetRow(i int) any AppendRow(row interface{}) error + AppendRows(rows interface{}) error GetDataType() schemapb.DataType } @@ -408,6 +417,135 @@ func (data *Float16VectorFieldData) AppendRow(row interface{}) error { return nil } +func (data *BoolFieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *Int8FieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]int8) + if !ok { + return merr.WrapErrParameterInvalid("[]int8", rows, "Wrong rows type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *Int16FieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]int16) + if !ok { + return merr.WrapErrParameterInvalid("[]int16", rows, "Wrong rows type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *Int32FieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]int32) + if !ok { + return merr.WrapErrParameterInvalid("[]int32", rows, "Wrong rows type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *Int64FieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]int64) + if !ok { + return merr.WrapErrParameterInvalid("[]int64", rows, "Wrong rows type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *FloatFieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]float32) + if !ok { + return merr.WrapErrParameterInvalid("[]float32", rows, "Wrong rows type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *DoubleFieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]float64) + if !ok { + return merr.WrapErrParameterInvalid("[]float64", rows, "Wrong rows type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *StringFieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]string) + if !ok { + return merr.WrapErrParameterInvalid("[]string", rows, "Wrong rows type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *ArrayFieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]*schemapb.ScalarField) + if !ok { + return merr.WrapErrParameterInvalid("[]*schemapb.ScalarField", rows, "Wrong rows type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *JSONFieldData) AppendRows(rows interface{}) error { + v, ok := rows.([][]byte) + if !ok { + return merr.WrapErrParameterInvalid("[][]byte", rows, "Wrong rows type") + } + data.Data = append(data.Data, v...) + return nil +} + +// AppendRows appends FLATTEN vectors to field data. +func (data *BinaryVectorFieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]byte) + if !ok { + return merr.WrapErrParameterInvalid("[]byte", rows, "Wrong rows type") + } + if len(v)%(data.Dim/8) != 0 { + return merr.WrapErrParameterInvalid(data.Dim/8, len(v), "Wrong vector size") + } + data.Data = append(data.Data, v...) + return nil +} + +// AppendRows appends FLATTEN vectors to field data. +func (data *FloatVectorFieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]float32) + if !ok || len(v)%(data.Dim) != 0 { + return merr.WrapErrParameterInvalid("[]float32", rows, "Wrong rows type") + } + if len(v)%(data.Dim) != 0 { + return merr.WrapErrParameterInvalid(data.Dim, len(v), "Wrong vector size") + } + data.Data = append(data.Data, v...) + return nil +} + +// AppendRows appends FLATTEN vectors to field data. +func (data *Float16VectorFieldData) AppendRows(rows interface{}) error { + v, ok := rows.([]byte) + if !ok || len(v)%(data.Dim*2) != 0 { + return merr.WrapErrParameterInvalid("[]byte", rows, "Wrong rows type") + } + if len(v)%(data.Dim*2) != 0 { + return merr.WrapErrParameterInvalid(data.Dim*2, len(v), "Wrong vector size") + } + data.Data = append(data.Data, v...) + return nil +} + // GetMemorySize implements FieldData.GetMemorySize func (data *BoolFieldData) GetMemorySize() int { return binary.Size(data.Data) } func (data *Int8FieldData) GetMemorySize() int { return binary.Size(data.Data) } diff --git a/internal/util/importutilv2/binlog/field_reader.go b/internal/util/importutilv2/binlog/field_reader.go new file mode 100644 index 0000000000..e7f010785a --- /dev/null +++ b/internal/util/importutilv2/binlog/field_reader.go @@ -0,0 +1,60 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" +) + +type fieldReader struct { + reader *storage.BinlogReader + fieldSchema *schemapb.FieldSchema +} + +func newFieldReader(cm storage.ChunkManager, fieldSchema *schemapb.FieldSchema, path string) (*fieldReader, error) { + reader, err := newBinlogReader(cm, path) + if err != nil { + return nil, err + } + return &fieldReader{ + reader: reader, + fieldSchema: fieldSchema, + }, nil +} + +func (r *fieldReader) Next(_ int64) (storage.FieldData, error) { + fieldData, err := storage.NewFieldData(r.fieldSchema.GetDataType(), r.fieldSchema) + if err != nil { + return nil, err + } + rowsSet, err := readData(r.reader, storage.InsertEventType) + if err != nil { + return nil, err + } + for _, rows := range rowsSet { + err = fieldData.AppendRows(rows) + if err != nil { + return nil, err + } + } + return fieldData, nil +} + +func (r *fieldReader) Close() { + r.reader.Close() +} diff --git a/internal/util/importutilv2/binlog/filter.go b/internal/util/importutilv2/binlog/filter.go new file mode 100644 index 0000000000..c63b6e25b5 --- /dev/null +++ b/internal/util/importutilv2/binlog/filter.go @@ -0,0 +1,48 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type Filter func(row map[int64]interface{}) bool + +func FilterWithDelete(r *reader) (Filter, error) { + pkField, err := typeutil.GetPrimaryFieldSchema(r.schema) + if err != nil { + return nil, err + } + return func(row map[int64]interface{}) bool { + rowPk := row[pkField.GetFieldID()] + rowTs := row[common.TimeStampField] + for i, pk := range r.deleteData.Pks { + if pk.GetValue() == rowPk && int64(r.deleteData.Tss[i]) > rowTs.(int64) { + return false + } + } + return true + }, nil +} + +func FilterWithTimeRange(tsStart, tsEnd uint64) Filter { + return func(row map[int64]interface{}) bool { + ts := row[common.TimeStampField].(int64) + return uint64(ts) >= tsStart && uint64(ts) <= tsEnd + } +} diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go new file mode 100644 index 0000000000..438e86b391 --- /dev/null +++ b/internal/util/importutilv2/binlog/reader.go @@ -0,0 +1,194 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "context" + "encoding/json" + "math" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type reader struct { + cm storage.ChunkManager + schema *schemapb.CollectionSchema + + deleteData *storage.DeleteData + insertLogs map[int64][]string // fieldID -> binlogs + + readIdx int + filters []Filter +} + +func NewReader(cm storage.ChunkManager, + schema *schemapb.CollectionSchema, + paths []string, + tsStart, + tsEnd uint64, +) (*reader, error) { + r := &reader{ + cm: cm, + schema: schema, + } + err := r.init(paths, tsStart, tsEnd) + if err != nil { + return nil, err + } + return r, nil +} + +func (r *reader) init(paths []string, tsStart, tsEnd uint64) error { + if tsStart != 0 || tsEnd != math.MaxUint64 { + r.filters = append(r.filters, FilterWithTimeRange(tsStart, tsEnd)) + } + if len(paths) < 1 { + return merr.WrapErrImportFailed("no insert binlogs to import") + } + insertLogs, err := listInsertLogs(r.cm, paths[0]) + if err != nil { + return err + } + err = verify(r.schema, insertLogs) + if err != nil { + return err + } + r.insertLogs = insertLogs + + if len(paths) < 2 { + return nil + } + deltaLogs, _, err := r.cm.ListWithPrefix(context.Background(), paths[1], true) + if err != nil { + return err + } + if len(deltaLogs) == 0 { + return nil + } + r.deleteData, err = r.readDelete(deltaLogs, tsStart, tsEnd) + if err != nil { + return err + } + + deleteFilter, err := FilterWithDelete(r) + if err != nil { + return err + } + r.filters = append(r.filters, deleteFilter) + return nil +} + +func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (*storage.DeleteData, error) { + deleteData := storage.NewDeleteData(nil, nil) + for _, path := range deltaLogs { + reader, err := newBinlogReader(r.cm, path) + if err != nil { + return nil, err + } + rowsSet, err := readData(reader, storage.DeleteEventType) + if err != nil { + return nil, err + } + for _, rows := range rowsSet { + for _, row := range rows.([]string) { + dl := &storage.DeleteLog{} + err = json.Unmarshal([]byte(row), dl) + if err != nil { + return nil, err + } + if dl.Ts >= tsStart && dl.Ts <= tsEnd { + deleteData.Append(dl.Pk, dl.Ts) + } + } + } + } + return deleteData, nil +} + +func (r *reader) Read() (*storage.InsertData, error) { + insertData, err := storage.NewInsertData(r.schema) + if err != nil { + return nil, err + } + if r.readIdx == len(r.insertLogs[0]) { + return nil, nil + } + for fieldID, binlogs := range r.insertLogs { + field := typeutil.GetField(r.schema, fieldID) + if field == nil { + return nil, merr.WrapErrFieldNotFound(fieldID) + } + path := binlogs[r.readIdx] + fr, err := newFieldReader(r.cm, field, path) + if err != nil { + return nil, err + } + fieldData, err := fr.Next(-1) + if err != nil { + fr.Close() + return nil, err + } + fr.Close() + insertData.Data[field.GetFieldID()] = fieldData + } + insertData, err = r.filter(insertData) + if err != nil { + return nil, err + } + r.readIdx++ + return insertData, nil +} + +func (r *reader) filter(insertData *storage.InsertData) (*storage.InsertData, error) { + if len(r.filters) == 0 { + return insertData, nil + } + masks := make(map[int]struct{}, 0) +OUTER: + for i := 0; i < insertData.GetRowNum(); i++ { + row := insertData.GetRow(i) + for _, f := range r.filters { + if !f(row) { + masks[i] = struct{}{} + continue OUTER + } + } + } + if len(masks) == 0 { // no data will undergo filtration, return directly + return insertData, nil + } + result, err := storage.NewInsertData(r.schema) + if err != nil { + return nil, err + } + for i := 0; i < insertData.GetRowNum(); i++ { + if _, ok := masks[i]; ok { + continue + } + row := insertData.GetRow(i) + err = result.Append(row) + if err != nil { + return nil, err + } + } + return result, nil +} + +func (r *reader) Close() {} diff --git a/internal/util/importutilv2/binlog/reader_test.go b/internal/util/importutilv2/binlog/reader_test.go new file mode 100644 index 0000000000..2c90bdd278 --- /dev/null +++ b/internal/util/importutilv2/binlog/reader_test.go @@ -0,0 +1,451 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + rand2 "crypto/rand" + "fmt" + "math" + "math/rand" + "strconv" + "testing" + + "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "golang.org/x/exp/slices" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type ReaderSuite struct { + suite.Suite + + schema *schemapb.CollectionSchema + numRows int + + pkDataType schemapb.DataType + vecDataType schemapb.DataType + + deletePKs []storage.PrimaryKey + deleteTss []int64 + + tsStart uint64 + tsEnd uint64 +} + +func (suite *ReaderSuite) SetupSuite() { + paramtable.Get().Init(paramtable.NewBaseTable()) +} + +func (suite *ReaderSuite) SetupTest() { + // default suite params + suite.numRows = 100 + suite.tsStart = 0 + suite.tsEnd = math.MaxUint64 + suite.pkDataType = schemapb.DataType_Int64 + suite.vecDataType = schemapb.DataType_FloatVector +} + +func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.FieldData) []byte { + dataType := field.GetDataType() + w := storage.NewInsertBinlogWriter(dataType, 1, 1, 1, field.GetFieldID()) + assert.NotNil(t, w) + defer w.Close() + + var dim int64 + var err error + dim, err = typeutil.GetDim(field) + if err != nil || dim == 0 { + dim = 1 + } + + evt, err := w.NextInsertEventWriter(int(dim)) + assert.NoError(t, err) + + evt.SetEventTimestamp(1, math.MaxInt64) + w.SetEventTimeStamp(1, math.MaxInt64) + + // without the two lines, the case will crash at here. + // the "original_size" is come from storage.originalSizeKey + sizeTotal := data.GetMemorySize() + w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal)) + + switch dataType { + case schemapb.DataType_Bool: + err = evt.AddBoolToPayload(data.(*storage.BoolFieldData).Data) + assert.NoError(t, err) + case schemapb.DataType_Int8: + err = evt.AddInt8ToPayload(data.(*storage.Int8FieldData).Data) + assert.NoError(t, err) + case schemapb.DataType_Int16: + err = evt.AddInt16ToPayload(data.(*storage.Int16FieldData).Data) + assert.NoError(t, err) + case schemapb.DataType_Int32: + err = evt.AddInt32ToPayload(data.(*storage.Int32FieldData).Data) + assert.NoError(t, err) + case schemapb.DataType_Int64: + err = evt.AddInt64ToPayload(data.(*storage.Int64FieldData).Data) + assert.NoError(t, err) + case schemapb.DataType_Float: + err = evt.AddFloatToPayload(data.(*storage.FloatFieldData).Data) + assert.NoError(t, err) + case schemapb.DataType_Double: + err = evt.AddDoubleToPayload(data.(*storage.DoubleFieldData).Data) + assert.NoError(t, err) + case schemapb.DataType_VarChar: + values := data.(*storage.StringFieldData).Data + for _, val := range values { + err = evt.AddOneStringToPayload(val) + assert.NoError(t, err) + } + case schemapb.DataType_JSON: + rows := data.(*storage.JSONFieldData).Data + for i := 0; i < len(rows); i++ { + err = evt.AddOneJSONToPayload(rows[i]) + assert.NoError(t, err) + } + case schemapb.DataType_Array: + rows := data.(*storage.ArrayFieldData).Data + for i := 0; i < len(rows); i++ { + err = evt.AddOneArrayToPayload(rows[i]) + assert.NoError(t, err) + } + case schemapb.DataType_BinaryVector: + vectors := data.(*storage.BinaryVectorFieldData).Data + err = evt.AddBinaryVectorToPayload(vectors, int(dim)) + assert.NoError(t, err) + case schemapb.DataType_FloatVector: + vectors := data.(*storage.FloatVectorFieldData).Data + err = evt.AddFloatVectorToPayload(vectors, int(dim)) + assert.NoError(t, err) + case schemapb.DataType_Float16Vector: + vectors := data.(*storage.Float16VectorFieldData).Data + err = evt.AddFloat16VectorToPayload(vectors, int(dim)) + assert.NoError(t, err) + default: + assert.True(t, false) + return nil + } + + err = w.Finish() + assert.NoError(t, err) + buf, err := w.GetBuffer() + assert.NoError(t, err) + return buf +} + +func createDeltaBuf(t *testing.T, deletePKs []storage.PrimaryKey, deleteTss []int64) []byte { + assert.Equal(t, len(deleteTss), len(deletePKs)) + deleteData := storage.NewDeleteData(nil, nil) + for i := range deletePKs { + deleteData.Append(deletePKs[i], uint64(deleteTss[i])) + } + deleteCodec := storage.NewDeleteCodec() + blob, err := deleteCodec.Serialize(1, 1, 1, deleteData) + assert.NoError(t, err) + return blob.Value +} + +func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData { + insertData, err := storage.NewInsertData(schema) + assert.NoError(t, err) + for _, field := range schema.GetFields() { + switch field.GetDataType() { + case schemapb.DataType_Bool: + boolData := make([]bool, 0) + for i := 0; i < rowCount; i++ { + boolData = append(boolData, i%3 != 0) + } + insertData.Data[field.GetFieldID()] = &storage.BoolFieldData{Data: boolData} + case schemapb.DataType_Float: + floatData := make([]float32, 0) + for i := 0; i < rowCount; i++ { + floatData = append(floatData, float32(i/2)) + } + insertData.Data[field.GetFieldID()] = &storage.FloatFieldData{Data: floatData} + case schemapb.DataType_Double: + doubleData := make([]float64, 0) + for i := 0; i < rowCount; i++ { + doubleData = append(doubleData, float64(i/5)) + } + insertData.Data[field.GetFieldID()] = &storage.DoubleFieldData{Data: doubleData} + case schemapb.DataType_Int8: + int8Data := make([]int8, 0) + for i := 0; i < rowCount; i++ { + int8Data = append(int8Data, int8(i%256)) + } + insertData.Data[field.GetFieldID()] = &storage.Int8FieldData{Data: int8Data} + case schemapb.DataType_Int16: + int16Data := make([]int16, 0) + for i := 0; i < rowCount; i++ { + int16Data = append(int16Data, int16(i%65536)) + } + insertData.Data[field.GetFieldID()] = &storage.Int16FieldData{Data: int16Data} + case schemapb.DataType_Int32: + int32Data := make([]int32, 0) + for i := 0; i < rowCount; i++ { + int32Data = append(int32Data, int32(i%1000)) + } + insertData.Data[field.GetFieldID()] = &storage.Int32FieldData{Data: int32Data} + case schemapb.DataType_Int64: + int64Data := make([]int64, 0) + for i := 0; i < rowCount; i++ { + int64Data = append(int64Data, int64(i)) + } + insertData.Data[field.GetFieldID()] = &storage.Int64FieldData{Data: int64Data} + case schemapb.DataType_BinaryVector: + dim, err := typeutil.GetDim(field) + assert.NoError(t, err) + binVecData := make([]byte, 0) + total := rowCount * int(dim) / 8 + for i := 0; i < total; i++ { + binVecData = append(binVecData, byte(i%256)) + } + insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: binVecData, Dim: int(dim)} + case schemapb.DataType_FloatVector: + dim, err := typeutil.GetDim(field) + assert.NoError(t, err) + floatVecData := make([]float32, 0) + total := rowCount * int(dim) + for i := 0; i < total; i++ { + floatVecData = append(floatVecData, rand.Float32()) + } + insertData.Data[field.GetFieldID()] = &storage.FloatVectorFieldData{Data: floatVecData, Dim: int(dim)} + case schemapb.DataType_Float16Vector: + dim, err := typeutil.GetDim(field) + assert.NoError(t, err) + total := int64(rowCount) * dim * 2 + float16VecData := make([]byte, total) + _, err = rand2.Read(float16VecData) + assert.NoError(t, err) + insertData.Data[field.GetFieldID()] = &storage.Float16VectorFieldData{Data: float16VecData, Dim: int(dim)} + case schemapb.DataType_String, schemapb.DataType_VarChar: + varcharData := make([]string, 0) + for i := 0; i < rowCount; i++ { + varcharData = append(varcharData, strconv.Itoa(i)) + } + insertData.Data[field.GetFieldID()] = &storage.StringFieldData{Data: varcharData} + case schemapb.DataType_JSON: + jsonData := make([][]byte, 0) + for i := 0; i < rowCount; i++ { + jsonData = append(jsonData, []byte(fmt.Sprintf("{\"y\": %d}", i))) + } + insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData} + case schemapb.DataType_Array: + arrayData := make([]*schemapb.ScalarField, 0) + for i := 0; i < rowCount; i++ { + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: []int32{int32(i), int32(i + 1), int32(i + 2)}, + }, + }, + }) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + default: + panic(fmt.Sprintf("unexpected data type: %s", field.GetDataType().String())) + } + } + return insertData +} + +func (suite *ReaderSuite) run(dt schemapb.DataType) { + const ( + insertPrefix = "mock-insert-binlog-prefix" + deltaPrefix = "mock-delta-binlog-prefix" + ) + insertBinlogs := map[int64][]string{ + 0: { + "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/0/435978159903735801", + }, + 1: { + "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/1/435978159903735811", + }, + 100: { + "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/100/435978159903735821", + }, + 101: { + "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/101/435978159903735831", + }, + 102: { + "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/102/435978159903735841", + }, + } + var deltaLogs []string + if len(suite.deletePKs) != 0 { + deltaLogs = []string{ + "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009/434574382554415105", + } + } + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + DataType: suite.pkDataType, + }, + { + FieldID: 101, + Name: "vec", + DataType: suite.vecDataType, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "8", + }, + }, + }, + { + FieldID: 102, + Name: dt.String(), + DataType: dt, + }, + }, + } + cm := mocks.NewChunkManager(suite.T()) + typeutil.AppendSystemFields(schema) + + originalInsertData := createInsertData(suite.T(), schema, suite.numRows) + insertLogs := lo.Flatten(lo.Values(insertBinlogs)) + + cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(insertLogs, nil, nil) + cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(deltaLogs, nil, nil) + for fieldID, paths := range insertBinlogs { + field := typeutil.GetField(schema, fieldID) + buf0 := createBinlogBuf(suite.T(), field, originalInsertData.Data[fieldID]) + cm.EXPECT().Read(mock.Anything, paths[0]).Return(buf0, nil) + } + + if len(suite.deletePKs) != 0 { + for _, path := range deltaLogs { + buf := createDeltaBuf(suite.T(), suite.deletePKs, suite.deleteTss) + cm.EXPECT().Read(mock.Anything, path).Return(buf, nil) + } + } + + reader, err := NewReader(cm, schema, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd) + suite.NoError(err) + insertData, err := reader.Read() + suite.NoError(err) + + pks, err := storage.GetPkFromInsertData(schema, originalInsertData) + suite.NoError(err) + tss, err := storage.GetTimestampFromInsertData(originalInsertData) + suite.NoError(err) + expectInsertData, err := storage.NewInsertData(schema) + suite.NoError(err) + for _, field := range schema.GetFields() { + expectInsertData.Data[field.GetFieldID()], err = storage.NewFieldData(field.GetDataType(), field) + suite.NoError(err) + } +OUTER: + for i := 0; i < suite.numRows; i++ { + if uint64(tss.Data[i]) < suite.tsStart || uint64(tss.Data[i]) > suite.tsEnd { + continue + } + for j := 0; j < len(suite.deletePKs); j++ { + if suite.deletePKs[j].GetValue() == pks.GetRow(i) && suite.deleteTss[j] > tss.Data[i] { + continue OUTER + } + } + err = expectInsertData.Append(originalInsertData.GetRow(i)) + suite.NoError(err) + } + + expectRowCount := expectInsertData.GetRowNum() + for fieldID, data := range insertData.Data { + suite.Equal(expectRowCount, data.RowNum()) + fieldData := expectInsertData.Data[fieldID] + fieldDataType := typeutil.GetField(schema, fieldID).GetDataType() + for i := 0; i < expectRowCount; i++ { + expect := fieldData.GetRow(i) + actual := data.GetRow(i) + if fieldDataType == schemapb.DataType_Array { + suite.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData())) + } else { + suite.Equal(expect, actual) + } + } + } +} + +func (suite *ReaderSuite) TestReadScalarFields() { + suite.run(schemapb.DataType_Bool) + suite.run(schemapb.DataType_Int8) + suite.run(schemapb.DataType_Int16) + suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int64) + suite.run(schemapb.DataType_Float) + suite.run(schemapb.DataType_Double) + suite.run(schemapb.DataType_VarChar) + suite.run(schemapb.DataType_Array) + suite.run(schemapb.DataType_JSON) +} + +func (suite *ReaderSuite) TestWithTSRangeAndDelete() { + suite.numRows = 10 + suite.tsStart = 2 + suite.tsEnd = 8 + suite.deletePKs = []storage.PrimaryKey{ + storage.NewInt64PrimaryKey(1), + storage.NewInt64PrimaryKey(4), + storage.NewInt64PrimaryKey(6), + storage.NewInt64PrimaryKey(8), + } + suite.deleteTss = []int64{ + 8, 8, 1, 8, + } + suite.run(schemapb.DataType_Int32) +} + +func (suite *ReaderSuite) TestStringPK() { + suite.pkDataType = schemapb.DataType_VarChar + suite.numRows = 10 + suite.tsStart = 2 + suite.tsEnd = 8 + suite.deletePKs = []storage.PrimaryKey{ + storage.NewVarCharPrimaryKey("1"), + storage.NewVarCharPrimaryKey("4"), + storage.NewVarCharPrimaryKey("6"), + storage.NewVarCharPrimaryKey("8"), + } + suite.deleteTss = []int64{ + 8, 8, 1, 8, + } + suite.run(schemapb.DataType_Int32) +} + +func (suite *ReaderSuite) TestBinaryAndFloat16Vector() { + suite.vecDataType = schemapb.DataType_BinaryVector + suite.run(schemapb.DataType_Int32) + suite.vecDataType = schemapb.DataType_Float16Vector + suite.run(schemapb.DataType_Int32) +} + +func TestUtil(t *testing.T) { + suite.Run(t, new(ReaderSuite)) +} diff --git a/internal/util/importutilv2/binlog/util.go b/internal/util/importutilv2/binlog/util.go new file mode 100644 index 0000000000..b4da2e4d73 --- /dev/null +++ b/internal/util/importutilv2/binlog/util.go @@ -0,0 +1,111 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "context" + "fmt" + "path" + "sort" + "strconv" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func readData(reader *storage.BinlogReader, et storage.EventTypeCode) ([]any, error) { + rowsSet := make([]any, 0) + for { + event, err := reader.NextEventReader() + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to iterate events reader, error: %v", err)) + } + if event == nil { + break // end of the file + } + if event.TypeCode != et { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("wrong binlog type, expect:%s, actual:%s", + et.String(), event.TypeCode.String())) + } + rows, _, err := event.PayloadReaderInterface.GetDataFromPayload() + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read data, error: %v", err)) + } + rowsSet = append(rowsSet, rows) + } + return rowsSet, nil +} + +func newBinlogReader(cm storage.ChunkManager, path string) (*storage.BinlogReader, error) { + bytes, err := cm.Read(context.TODO(), path) // TODO: dyh, resolve context, and checks if the error is a retryable error + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to open binlog %s", path)) + } + var reader *storage.BinlogReader + reader, err = storage.NewBinlogReader(bytes) + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to create reader, binlog:%s, error:%v", path, err)) + } + return reader, nil +} + +func listInsertLogs(cm storage.ChunkManager, insertPrefix string) (map[int64][]string, error) { + insertLogPaths, _, err := cm.ListWithPrefix(context.Background(), insertPrefix, true) + if err != nil { + return nil, err + } + insertLogs := make(map[int64][]string) + for _, logPath := range insertLogPaths { + fieldPath := path.Dir(logPath) + fieldStrID := path.Base(fieldPath) + fieldID, err := strconv.ParseInt(fieldStrID, 10, 64) + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to parse field id from log, error: %v", err)) + } + insertLogs[fieldID] = append(insertLogs[fieldID], logPath) + } + for _, v := range insertLogs { + sort.Strings(v) + } + return insertLogs, nil +} + +func verify(schema *schemapb.CollectionSchema, insertLogs map[int64][]string) error { + // 1. check schema fields + for _, field := range schema.GetFields() { + if _, ok := insertLogs[field.GetFieldID()]; !ok { + return merr.WrapErrImportFailed(fmt.Sprintf("no binlog for field:%s", field.GetName())) + } + } + // 2. check system fields (ts and rowID) + if _, ok := insertLogs[common.RowIDField]; !ok { + return merr.WrapErrImportFailed("no binlog for RowID field") + } + if _, ok := insertLogs[common.TimeStampField]; !ok { + return merr.WrapErrImportFailed("no binlog for TimestampField") + } + // 3. check file count + for fieldID, logs := range insertLogs { + if len(logs) != len(insertLogs[common.RowIDField]) { + return merr.WrapErrImportFailed(fmt.Sprintf("misaligned binlog count, field%d:%d, field%d:%d", + fieldID, len(logs), common.RowIDField, len(insertLogs[common.RowIDField]))) + } + } + return nil +} diff --git a/internal/util/importutilv2/option.go b/internal/util/importutilv2/option.go new file mode 100644 index 0000000000..799abdd3ab --- /dev/null +++ b/internal/util/importutilv2/option.go @@ -0,0 +1,72 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importutilv2 + +import ( + "fmt" + "math" + "strconv" + "strings" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +const ( + StartTs = "start_ts" + EndTs = "end_ts" + BackupFlag = "backup" +) + +type Options []*commonpb.KeyValuePair + +func ParseTimeRange(options Options) (uint64, uint64, error) { + importOptions := funcutil.KeyValuePair2Map(options) + getTimestamp := func(key string, defaultValue uint64) (uint64, error) { + if value, ok := importOptions[key]; ok { + pTs, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return 0, merr.WrapErrImportFailed(fmt.Sprintf("parse %s failed, value=%s, err=%s", key, value, err)) + } + return tsoutil.ComposeTS(pTs, 0), nil + } + return defaultValue, nil + } + tsStart, err := getTimestamp(StartTs, 0) + if err != nil { + return 0, 0, err + } + tsEnd, err := getTimestamp(EndTs, math.MaxUint64) + if err != nil { + return 0, 0, err + } + if tsStart > tsEnd { + return 0, 0, merr.WrapErrImportFailed( + fmt.Sprintf("start_ts shouldn't be larger than end_ts, start_ts:%d, end_ts:%d", tsStart, tsEnd)) + } + return tsStart, tsEnd, nil +} + +func IsBackup(options Options) bool { + isBackup, err := funcutil.GetAttrByKeyFromRepeatedKV(BackupFlag, options) + if err != nil || strings.ToLower(isBackup) != "true" { + return false + } + return true +} diff --git a/internal/util/importutilv2/reader.go b/internal/util/importutilv2/reader.go new file mode 100644 index 0000000000..dea8374ef1 --- /dev/null +++ b/internal/util/importutilv2/reader.go @@ -0,0 +1,45 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importutilv2 + +import ( + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2/binlog" +) + +type Reader interface { + Read() (*storage.InsertData, error) + Close() +} + +func NewReader(cm storage.ChunkManager, + schema *schemapb.CollectionSchema, + paths []string, + options Options, + bufferSize int64, +) (Reader, error) { + if IsBackup(options) { + tsStart, tsEnd, err := ParseTimeRange(options) + if err != nil { + return nil, err + } + return binlog.NewReader(cm, schema, paths, tsStart, tsEnd) + } + + return nil, nil +} diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index ad1d5df3b5..e56b83b946 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -867,6 +867,21 @@ func IsPrimaryFieldDataExist(datas []*schemapb.FieldData, primaryFieldSchema *sc return primaryFieldData != nil } +func AppendSystemFields(schema *schemapb.CollectionSchema) { + schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ + FieldID: int64(common.RowIDField), + Name: common.RowIDFieldName, + IsPrimaryKey: false, + DataType: schemapb.DataType_Int64, + }) + schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ + FieldID: int64(common.TimeStampField), + Name: common.TimeStampFieldName, + IsPrimaryKey: false, + DataType: schemapb.DataType_Int64, + }) +} + func AppendIDs(dst *schemapb.IDs, src *schemapb.IDs, idx int) { switch src.IdField.(type) { case *schemapb.IDs_IntId: diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 40722787c8..c432f00d8f 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -109,6 +109,7 @@ go test -race -cover -tags dynamic "${PKG_DIR}/util/retry/..." -failfast -count= go test -race -cover -tags dynamic "${MILVUS_DIR}/util/sessionutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" go test -race -cover -tags dynamic "${MILVUS_DIR}/util/typeutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" go test -race -cover -tags dynamic "${MILVUS_DIR}/util/importutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" go test -race -cover -tags dynamic "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" }