diff --git a/internal/master/field_id.go b/internal/master/field_id.go new file mode 100644 index 0000000000..981cd152c0 --- /dev/null +++ b/internal/master/field_id.go @@ -0,0 +1,13 @@ +package master + +// system filed id: +// 0: unique row id +// 1: timestamp +// 100: first user field id +// 101: second user field id +// 102: ... + +const ( + RowIDField = 0 + TimeStampField = 1 +) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 68d06b9357..880fc82890 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -4,16 +4,12 @@ import ( "fmt" "github.com/zilliztech/milvus-distributed/internal/errors" + ms "github.com/zilliztech/milvus-distributed/internal/master" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) -const ( - TsField int64 = 1 - DDLField int64 = 2 -) - type ( UniqueID = typeutil.UniqueID FieldID = typeutil.UniqueID @@ -80,7 +76,6 @@ type FloatVectorFieldData struct { // system filed id: // 0: unique row id // 1: timestamp -// 2: ddl // 100: first user field id // 101: second user field id // 102: ... @@ -92,7 +87,7 @@ type InsertData struct { } // Blob key example: -// ${segment_id}/${field_id} +// ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx} type InsertCodec struct { Base readerCloseFunc []func() error @@ -102,7 +97,11 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique var blobs []*Blob var writer *InsertBinlogWriter var err error - ts := (data.Data[1]).(Int64FieldData).data + timeFieldData, ok := data.Data[ms.TimeStampField] + if !ok { + return nil, errors.New("data doesn't contains timestamp field") + } + ts := timeFieldData.(Int64FieldData).data for _, field := range insertCodec.Schema.Schema.Fields { singleData := data.Data[field.FieldID] @@ -149,7 +148,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique return nil, err } if writer == nil { - return nil, fmt.Errorf("binlog writer is nil") + return nil, errors.New("binlog writer is nil") } writer.SetStartTimeStamp(typeutil.Timestamp(ts[0])) writer.SetEndTimeStamp(typeutil.Timestamp(ts[len(ts)-1])) @@ -163,7 +162,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { return nil, err } - blobKey := fmt.Sprintf("%d/%d", segmentID, field.FieldID) + blobKey := fmt.Sprintf("%d", field.FieldID) blobs = append(blobs, &Blob{ key: blobKey, value: buffer, @@ -175,7 +174,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique } func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) { if len(blobs) == 0 { - return -1, -1, nil, fmt.Errorf("blobs is empty") + return -1, -1, nil, errors.New("blobs is empty") } readerClose := func(reader *BinlogReader) func() error { return func() error { return reader.Close() } @@ -352,7 +351,7 @@ func (insertCodec *InsertCodec) Close() error { } // Blob key example: -// ${collection_id} +// ${tenant}/data_definition_log/${collection_id}/${field_type}/${log_idx} type DataDefinitionCodec struct { Base readerCloseFunc []func() error @@ -414,7 +413,6 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ } } } - writer.FieldID = DDLField writer.SetStartTimeStamp(ts[0]) writer.SetEndTimeStamp(ts[len(ts)-1]) err = writer.Close() @@ -425,9 +423,8 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - blobKey := fmt.Sprintf("%d/%d", dataDefinitionCodec.Schema.ID, DDLField) blobs = append(blobs, &Blob{ - key: blobKey, + key: "", value: buffer, }) @@ -452,7 +449,6 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ eventWriter.SetEndTimestamp(ts[len(ts)-1]) writer.SetStartTimeStamp(ts[0]) writer.SetEndTimeStamp(ts[len(ts)-1]) - writer.FieldID = TsField err = writer.Close() if err != nil { return nil, err @@ -461,9 +457,8 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - blobKey = fmt.Sprintf("%d/%d", dataDefinitionCodec.Schema.ID, TsField) blobs = append(blobs, &Blob{ - key: blobKey, + key: "", value: buffer, }) @@ -473,7 +468,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) { if len(blobs) == 0 { - return nil, nil, fmt.Errorf("blobs is empty") + return nil, nil, errors.New("blobs is empty") } readerClose := func(reader *BinlogReader) func() error { return func() error { return reader.Close() } @@ -485,10 +480,10 @@ func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts [ if err != nil { return nil, nil, err } - fieldID := binlogReader.FieldID + dataType := binlogReader.PayloadDataType - switch fieldID { - case TsField: + switch dataType { + case schemapb.DataType_INT64: eventReader, err := binlogReader.NextEventReader() if err != nil { return nil, nil, err @@ -501,7 +496,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts [ resultTs = append(resultTs, Timestamp(singleTs)) } dataDefinitionCodec.readerCloseFunc = append(dataDefinitionCodec.readerCloseFunc, readerClose(binlogReader)) - case DDLField: + case schemapb.DataType_STRING: binlogReader, err := NewBinlogReader(blob.value) if err != nil { return nil, nil, err