From 4e6e502df1177c37699f8cb9dd3f6435609acc7f Mon Sep 17 00:00:00 2001 From: yukun Date: Fri, 14 May 2021 10:59:49 +0800 Subject: [PATCH] Add data sorter in storage (#5200) GetEntityByID needs the flushed segment to be sorted by RowID field, then do binary search to get the target id and entities. See also: #5177 Signed-off-by: fishpenguin kun.yu@zilliz.com --- internal/storage/data_codec.go | 16 +- internal/storage/data_sorter.go | 85 ++++++++++ internal/storage/data_sorter_test.go | 236 +++++++++++++++++++++++++++ 3 files changed, 333 insertions(+), 4 deletions(-) create mode 100644 internal/storage/data_sorter.go create mode 100644 internal/storage/data_sorter_test.go diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index c51001c833..c08436bcaf 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -146,6 +146,14 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique return nil, errors.New("data doesn't contains timestamp field") } ts := timeFieldData.(*Int64FieldData).Data + startTs := ts[0] + endTs := ts[len(ts)-1] + + dataSorter := &DataSorter{ + InsertCodec: insertCodec, + InsertData: data, + } + sort.Sort(dataSorter) for _, field := range insertCodec.Schema.Schema.Fields { singleData := data.Data[field.FieldID] @@ -154,8 +162,8 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { return nil, err } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) + eventWriter.SetStartTimestamp(typeutil.Timestamp(startTs)) + eventWriter.SetEndTimestamp(typeutil.Timestamp(endTs)) switch field.DataType { case schemapb.DataType_Bool: err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data) @@ -188,8 +196,8 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { return nil, err } - writer.SetStartTimeStamp(typeutil.Timestamp(ts[0])) - writer.SetEndTimeStamp(typeutil.Timestamp(ts[len(ts)-1])) + writer.SetStartTimeStamp(typeutil.Timestamp(startTs)) + writer.SetEndTimeStamp(typeutil.Timestamp(endTs)) err = writer.Close() if err != nil { diff --git a/internal/storage/data_sorter.go b/internal/storage/data_sorter.go new file mode 100644 index 0000000000..4e26848d19 --- /dev/null +++ b/internal/storage/data_sorter.go @@ -0,0 +1,85 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package storage + +import ( + "github.com/milvus-io/milvus/internal/proto/schemapb" +) + +type DataSorter struct { + InsertCodec *InsertCodec + InsertData *InsertData +} + +func (ds *DataSorter) getIDField() FieldData { + for _, field := range ds.InsertCodec.Schema.Schema.Fields { + if field.FieldID == 0 { + return ds.InsertData.Data[field.FieldID] + } + } + return nil +} + +func (ds *DataSorter) Len() int { + return len(ds.getIDField().(*Int64FieldData).Data) +} + +func (ds *DataSorter) Swap(i, j int) { + for _, field := range ds.InsertCodec.Schema.Schema.Fields { + singleData := ds.InsertData.Data[field.FieldID] + switch field.DataType { + case schemapb.DataType_Bool: + data := singleData.(*BoolFieldData).Data + data[i], data[j] = data[j], data[i] + case schemapb.DataType_Int8: + data := singleData.(*Int8FieldData).Data + data[i], data[j] = data[j], data[i] + case schemapb.DataType_Int16: + data := singleData.(*Int16FieldData).Data + data[i], data[j] = data[j], data[i] + case schemapb.DataType_Int32: + data := singleData.(*Int32FieldData).Data + data[i], data[j] = data[j], data[i] + case schemapb.DataType_Int64: + data := singleData.(*Int64FieldData).Data + data[i], data[j] = data[j], data[i] + case schemapb.DataType_Float: + data := singleData.(*FloatFieldData).Data + data[i], data[j] = data[j], data[i] + case schemapb.DataType_Double: + data := singleData.(*DoubleFieldData).Data + data[i], data[j] = data[j], data[i] + case schemapb.DataType_String: + data := singleData.(*StringFieldData).Data + data[i], data[j] = data[j], data[i] + case schemapb.DataType_BinaryVector: + data := singleData.(*BinaryVectorFieldData).Data + dim := singleData.(*BinaryVectorFieldData).Dim + for i := 0; i < dim/8; i++ { + data[i], data[i+dim/8] = data[i+dim/8], data[i] + } + case schemapb.DataType_FloatVector: + data := singleData.(*FloatVectorFieldData).Data + dim := singleData.(*FloatVectorFieldData).Dim + for i := 0; i < dim; i++ { + data[i], data[i+dim] = data[i+dim], data[i] + } + default: + errMsg := "undefined data type " + string(field.DataType) + panic(errMsg) + } + } +} + +func (ds *DataSorter) Less(i, j int) bool { + return ds.getIDField().(*Int64FieldData).Data[i] < ds.getIDField().(*Int64FieldData).Data[j] +} diff --git a/internal/storage/data_sorter_test.go b/internal/storage/data_sorter_test.go new file mode 100644 index 0000000000..37d461d7ef --- /dev/null +++ b/internal/storage/data_sorter_test.go @@ -0,0 +1,236 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package storage + +import ( + "sort" + "testing" + + "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/stretchr/testify/assert" +) + +func TestDataSorter(t *testing.T) { + schema := &etcdpb.CollectionMeta{ + ID: 1, + CreateTime: 1, + SegmentIDs: []int64{0, 1}, + PartitionTags: []string{"partition_0", "partition_1"}, + Schema: &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 0, + Name: "row_id", + IsPrimaryKey: false, + Description: "row_id", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 1, + Name: "Ts", + IsPrimaryKey: false, + Description: "Ts", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, + Name: "field_bool", + IsPrimaryKey: false, + Description: "description_2", + DataType: schemapb.DataType_Bool, + }, + { + FieldID: 101, + Name: "field_int8", + IsPrimaryKey: false, + Description: "description_3", + DataType: schemapb.DataType_Int8, + }, + { + FieldID: 102, + Name: "field_int16", + IsPrimaryKey: false, + Description: "description_4", + DataType: schemapb.DataType_Int16, + }, + { + FieldID: 103, + Name: "field_int32", + IsPrimaryKey: false, + Description: "description_5", + DataType: schemapb.DataType_Int32, + }, + { + FieldID: 104, + Name: "field_int64", + IsPrimaryKey: false, + Description: "description_6", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 105, + Name: "field_float", + IsPrimaryKey: false, + Description: "description_7", + DataType: schemapb.DataType_Float, + }, + { + FieldID: 106, + Name: "field_double", + IsPrimaryKey: false, + Description: "description_8", + DataType: schemapb.DataType_Double, + }, + { + FieldID: 107, + Name: "field_string", + IsPrimaryKey: false, + Description: "description_9", + DataType: schemapb.DataType_String, + }, + { + FieldID: 108, + Name: "field_binary_vector", + IsPrimaryKey: false, + Description: "description_10", + DataType: schemapb.DataType_BinaryVector, + }, + { + FieldID: 109, + Name: "field_float_vector", + IsPrimaryKey: false, + Description: "description_11", + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + } + + insertCodec := NewInsertCodec(schema) + insertDataFirst := &InsertData{ + Data: map[int64]FieldData{ + 0: &Int64FieldData{ + NumRows: 2, + Data: []int64{6, 4}, + }, + 1: &Int64FieldData{ + NumRows: 2, + Data: []int64{3, 4}, + }, + 100: &BoolFieldData{ + NumRows: 2, + Data: []bool{true, false}, + }, + 101: &Int8FieldData{ + NumRows: 2, + Data: []int8{3, 4}, + }, + 102: &Int16FieldData{ + NumRows: 2, + Data: []int16{3, 4}, + }, + 103: &Int32FieldData{ + NumRows: 2, + Data: []int32{3, 4}, + }, + 104: &Int64FieldData{ + NumRows: 2, + Data: []int64{3, 4}, + }, + 105: &FloatFieldData{ + NumRows: 2, + Data: []float32{3, 4}, + }, + 106: &DoubleFieldData{ + NumRows: 2, + Data: []float64{3, 4}, + }, + 107: &StringFieldData{ + NumRows: 2, + Data: []string{"3", "4"}, + }, + 108: &BinaryVectorFieldData{ + NumRows: 2, + Data: []byte{0, 255}, + Dim: 8, + }, + 109: &FloatVectorFieldData{ + NumRows: 2, + Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Dim: 8, + }, + }, + } + + dataSorter := &DataSorter{ + InsertCodec: insertCodec, + InsertData: insertDataFirst, + } + + sort.Sort(dataSorter) + + // for _, field := range insertCodec.Schema.Schema.Fields { + // singleData := insertDataFirst.Data[field.FieldID] + // switch field.DataType { + // case schemapb.DataType_Bool: + // fmt.Println("BoolField:") + // fmt.Println(singleData.(*BoolFieldData).Data) + // case schemapb.DataType_Int8: + // fmt.Println("Int8Field:") + // fmt.Println(singleData.(*Int8FieldData).Data) + // case schemapb.DataType_Int16: + // fmt.Println("Int16Field:") + // fmt.Println(singleData.(*Int16FieldData).Data) + // case schemapb.DataType_Int32: + // fmt.Println("Int32Field:") + // fmt.Println(singleData.(*Int32FieldData).Data) + // case schemapb.DataType_Int64: + // fmt.Println("Int64Field:") + // fmt.Println(singleData.(*Int64FieldData).Data) + // case schemapb.DataType_Float: + // fmt.Println("FloatField:") + // fmt.Println(singleData.(*FloatFieldData).Data) + // case schemapb.DataType_Double: + // fmt.Println("DoubleField:") + // fmt.Println(singleData.(*DoubleFieldData).Data) + // case schemapb.DataType_String: + // fmt.Println("StringField:") + // for _, singleString := range singleData.(*StringFieldData).Data { + // fmt.Println(singleString) + // } + // case schemapb.DataType_BinaryVector: + // fmt.Println("BinaryVectorField:") + // fmt.Println(singleData.(*BinaryVectorFieldData).Data) + // case schemapb.DataType_FloatVector: + // fmt.Println("FloatVectorField:") + // fmt.Println(singleData.(*FloatVectorFieldData).Data) + // default: + // } + // } + + assert.Equal(t, []int64{4, 6}, dataSorter.InsertData.Data[0].(*Int64FieldData).Data) + assert.Equal(t, []int64{4, 3}, dataSorter.InsertData.Data[1].(*Int64FieldData).Data) + assert.Equal(t, []bool{false, true}, dataSorter.InsertData.Data[100].(*BoolFieldData).Data) + assert.Equal(t, []int8{4, 3}, dataSorter.InsertData.Data[101].(*Int8FieldData).Data) + assert.Equal(t, []int16{4, 3}, dataSorter.InsertData.Data[102].(*Int16FieldData).Data) + assert.Equal(t, []int32{4, 3}, dataSorter.InsertData.Data[103].(*Int32FieldData).Data) + assert.Equal(t, []int64{4, 3}, dataSorter.InsertData.Data[104].(*Int64FieldData).Data) + assert.Equal(t, []float32{4, 3}, dataSorter.InsertData.Data[105].(*FloatFieldData).Data) + assert.Equal(t, []float64{4, 3}, dataSorter.InsertData.Data[106].(*DoubleFieldData).Data) + assert.Equal(t, []string{"4", "3"}, dataSorter.InsertData.Data[107].(*StringFieldData).Data) + assert.Equal(t, []byte{255, 0}, dataSorter.InsertData.Data[108].(*BinaryVectorFieldData).Data) + assert.Equal(t, []float32{8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7}, dataSorter.InsertData.Data[109].(*FloatVectorFieldData).Data) +}