package writenode import ( "bytes" "context" "encoding/binary" "fmt" "log" "path" "strconv" "unsafe" "github.com/opentracing/opentracing-go" oplog "github.com/opentracing/opentracing-go/log" "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/storage" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) const ( CollectionPrefix = "/collection/" SegmentPrefix = "/segment/" ) type ( InsertData = storage.InsertData Blob = storage.Blob insertBufferNode struct { BaseNode insertBuffer *insertBuffer minIOKV kv.Base minioPrifex string idAllocator *allocator.IDAllocator outCh chan *insertFlushSyncMsg pulsarWriteNodeTimeTickStream *msgstream.PulsarMsgStream replica collectionReplica } insertBuffer struct { insertData map[UniqueID]*InsertData // SegmentID to InsertData maxSize int } ) func (ib *insertBuffer) size(segmentID UniqueID) int { if ib.insertData == nil || len(ib.insertData) <= 0 { return 0 } idata, ok := ib.insertData[segmentID] if !ok { return 0 } maxSize := 0 for _, data := range idata.Data { fdata, ok := data.(*storage.FloatVectorFieldData) if ok && fdata.NumRows > maxSize { maxSize = fdata.NumRows } bdata, ok := data.(*storage.BinaryVectorFieldData) if ok && bdata.NumRows > maxSize { maxSize = bdata.NumRows } } return maxSize } func (ib *insertBuffer) full(segmentID UniqueID) bool { return ib.size(segmentID) >= ib.maxSize } func (ibNode *insertBufferNode) Name() string { return "ibNode" } func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // log.Println("=========== insert buffer Node Operating") if len(in) != 1 { log.Println("Error: Invalid operate message input in insertBuffertNode, input length = ", len(in)) // TODO: add error handling } iMsg, ok := (*in[0]).(*insertMsg) if !ok { log.Println("Error: type assertion failed for insertMsg") // TODO: add error handling } // iMsg is insertMsg // 1. iMsg -> buffer for _, msg := range iMsg.insertMessages { ctx := msg.GetContext() var span opentracing.Span if ctx != nil { span, _ = opentracing.StartSpanFromContext(ctx, fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs())) } else { span = opentracing.StartSpan(fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs())) } span.SetTag("hash keys", msg.HashKeys()) span.SetTag("start time", msg.BeginTs()) span.SetTag("end time", msg.EndTs()) if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { log.Println("Error: misaligned messages detected") continue } currentSegID := msg.GetSegmentID() collectionName := msg.GetCollectionName() span.LogFields(oplog.Int("segment id", int(currentSegID))) idata, ok := ibNode.insertBuffer.insertData[currentSegID] if !ok { idata = &InsertData{ Data: make(map[UniqueID]storage.FieldData), } } // Timestamps _, ok = idata.Data[1].(*storage.Int64FieldData) if !ok { idata.Data[1] = &storage.Int64FieldData{ Data: []int64{}, NumRows: 0, } } tsData := idata.Data[1].(*storage.Int64FieldData) for _, ts := range msg.Timestamps { tsData.Data = append(tsData.Data, int64(ts)) } tsData.NumRows += len(msg.Timestamps) span.LogFields(oplog.Int("tsData numRows", tsData.NumRows)) // 1.1 Get CollectionMeta from etcd collection, err := ibNode.replica.getCollectionByName(collectionName) //collSchema, err := ibNode.getCollectionSchemaByName(collectionName) if err != nil { // GOOSE TODO add error handler log.Println("Get meta wrong:", err) continue } collectionID := collection.ID() collSchema := collection.schema // 1.2 Get Fields var pos int = 0 // Record position of blob for _, field := range collSchema.Fields { switch field.DataType { case schemapb.DataType_VECTOR_FLOAT: var dim int for _, t := range field.TypeParams { if t.Key == "dim" { dim, err = strconv.Atoi(t.Value) if err != nil { log.Println("strconv wrong") } break } } if dim <= 0 { log.Println("invalid dim") // TODO: add error handling } if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.FloatVectorFieldData{ NumRows: 0, Data: make([]float32, 0), Dim: dim, } } fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData) for _, blob := range msg.RowData { for j := 0; j < dim; j++ { var v float32 buf := bytes.NewBuffer(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { log.Println("binary.read float32 err:", err) } fieldData.Data = append(fieldData.Data, v) pos += int(unsafe.Sizeof(*(&v))) } } fieldData.NumRows += len(msg.RowIDs) case schemapb.DataType_VECTOR_BINARY: var dim int for _, t := range field.TypeParams { if t.Key == "dim" { dim, err = strconv.Atoi(t.Value) if err != nil { log.Println("strconv wrong") } break } } if dim <= 0 { log.Println("invalid dim") // TODO: add error handling } if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{ NumRows: 0, Data: make([]byte, 0), Dim: dim, } } fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData) for _, blob := range msg.RowData { bv := blob.GetValue()[pos : pos+(dim/8)] fieldData.Data = append(fieldData.Data, bv...) pos += len(bv) } fieldData.NumRows += len(msg.RowData) case schemapb.DataType_BOOL: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.BoolFieldData{ NumRows: 0, Data: make([]bool, 0), } } fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData) for _, blob := range msg.RowData { var v bool buf := bytes.NewReader(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { log.Println("binary.Read bool failed:", err) } fieldData.Data = append(fieldData.Data, v) pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) case schemapb.DataType_INT8: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.Int8FieldData{ NumRows: 0, Data: make([]int8, 0), } } fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData) for _, blob := range msg.RowData { var v int8 buf := bytes.NewReader(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { log.Println("binary.Read int8 failed:", err) } fieldData.Data = append(fieldData.Data, v) pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) case schemapb.DataType_INT16: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.Int16FieldData{ NumRows: 0, Data: make([]int16, 0), } } fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData) for _, blob := range msg.RowData { var v int16 buf := bytes.NewReader(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { log.Println("binary.Read int16 failed:", err) } fieldData.Data = append(fieldData.Data, v) pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) case schemapb.DataType_INT32: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.Int32FieldData{ NumRows: 0, Data: make([]int32, 0), } } fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData) for _, blob := range msg.RowData { var v int32 buf := bytes.NewReader(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { log.Println("binary.Read int32 failed:", err) } fieldData.Data = append(fieldData.Data, v) pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) case schemapb.DataType_INT64: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.Int64FieldData{ NumRows: 0, Data: make([]int64, 0), } } fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData) switch field.FieldID { case 0: fieldData.Data = append(fieldData.Data, msg.RowIDs...) fieldData.NumRows += len(msg.RowIDs) case 1: // Timestamps for _, ts := range msg.Timestamps { fieldData.Data = append(fieldData.Data, int64(ts)) } fieldData.NumRows += len(msg.Timestamps) default: for _, blob := range msg.RowData { var v int64 buf := bytes.NewBuffer(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { log.Println("binary.Read int64 failed:", err) } fieldData.Data = append(fieldData.Data, v) pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) } case schemapb.DataType_FLOAT: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.FloatFieldData{ NumRows: 0, Data: make([]float32, 0), } } fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData) for _, blob := range msg.RowData { var v float32 buf := bytes.NewBuffer(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { log.Println("binary.Read float32 failed:", err) } fieldData.Data = append(fieldData.Data, v) pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) case schemapb.DataType_DOUBLE: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.DoubleFieldData{ NumRows: 0, Data: make([]float64, 0), } } fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData) for _, blob := range msg.RowData { var v float64 buf := bytes.NewBuffer(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { log.Println("binary.Read float64 failed:", err) } fieldData.Data = append(fieldData.Data, v) pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) } } // 1.3 store in buffer ibNode.insertBuffer.insertData[currentSegID] = idata span.LogFields(oplog.String("store in buffer", "store in buffer")) // 1.4 if full // 1.4.1 generate binlogs span.LogFields(oplog.String("generate binlogs", "generate binlogs")) if ibNode.insertBuffer.full(currentSegID) { log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID)) // partitionTag -> partitionID partitionTag := msg.GetPartitionTag() partitionID, err := typeutil.Hash32String(partitionTag) if err != nil { log.Println("partitionTag to partitionID wrong") // TODO GOOSE add error handler } collMeta := &etcdpb.CollectionMeta{ Schema: collSchema, ID: collectionID, } inCodec := storage.NewInsertCodec(collMeta) // buffer data to binlogs binLogs, err := inCodec.Serialize(partitionID, currentSegID, ibNode.insertBuffer.insertData[currentSegID]) if err != nil { log.Println("generate binlog wrong: ", err) } // clear buffer delete(ibNode.insertBuffer.insertData, currentSegID) log.Println(".. Clearing buffer") // 1.5.2 binLogs -> minIO/S3 collIDStr := strconv.FormatInt(collectionID, 10) partitionIDStr := strconv.FormatInt(partitionID, 10) segIDStr := strconv.FormatInt(currentSegID, 10) keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr) log.Printf(".. Saving (%v) binlogs to MinIO ...", len(binLogs)) for index, blob := range binLogs { uid, err := ibNode.idAllocator.AllocOne() if err != nil { log.Println("Allocate Id failed") // GOOSE TODO error handler } key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) err = ibNode.minIOKV.Save(key, string(blob.Value[:])) if err != nil { log.Println("Save to MinIO failed") // GOOSE TODO error handler } fieldID, err := strconv.ParseInt(blob.Key, 10, 32) if err != nil { log.Println("string to fieldID wrong") // GOOSE TODO error handler } inBinlogMsg := &insertFlushSyncMsg{ flushCompleted: false, insertBinlogPathMsg: insertBinlogPathMsg{ ts: iMsg.timeRange.timestampMax, segID: currentSegID, fieldID: fieldID, paths: []string{key}, }, } log.Println("... Appending binlog paths ...", index) ibNode.outCh <- inBinlogMsg } } span.Finish() } if len(iMsg.insertMessages) > 0 { log.Println("---insert buffer status---") var stopSign int = 0 for k := range ibNode.insertBuffer.insertData { if stopSign >= 10 { break } log.Printf("seg(%v) buffer size = (%v)", k, ibNode.insertBuffer.size(k)) stopSign++ } } // iMsg is Flush() msg from master // 1. insertBuffer(not empty) -> binLogs -> minIO/S3 for _, msg := range iMsg.flushMessages { currentSegID := msg.GetSegmentID() flushTs := msg.GetTimestamp() partitionTag := msg.GetPartitionTag() collectionID := msg.GetCollectionID() log.Printf(". Receiving flush message segID(%v)...", currentSegID) if ibNode.insertBuffer.size(currentSegID) > 0 { log.Println(".. Buffer not empty, flushing ...") collSchema, err := ibNode.getCollectionSchemaByID(collectionID) if err != nil { // GOOSE TODO add error handler log.Println("Get meta wrong: ", err) } collMeta := &etcdpb.CollectionMeta{ Schema: collSchema, ID: collectionID, } inCodec := storage.NewInsertCodec(collMeta) // partitionTag -> partitionID partitionID, err := typeutil.Hash32String(partitionTag) if err != nil { // GOOSE TODO add error handler log.Println("partitionTag to partitionID Wrong: ", err) } // buffer data to binlogs binLogs, err := inCodec.Serialize(partitionID, currentSegID, ibNode.insertBuffer.insertData[currentSegID]) if err != nil { log.Println("generate binlog wrong: ", err) } // clear buffer delete(ibNode.insertBuffer.insertData, currentSegID) // binLogs -> minIO/S3 collIDStr := strconv.FormatInt(collectionID, 10) partitionIDStr := strconv.FormatInt(partitionID, 10) segIDStr := strconv.FormatInt(currentSegID, 10) keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr) for _, blob := range binLogs { uid, err := ibNode.idAllocator.AllocOne() if err != nil { log.Println("Allocate Id failed") // GOOSE TODO error handler } key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) err = ibNode.minIOKV.Save(key, string(blob.Value[:])) if err != nil { log.Println("Save to MinIO failed") // GOOSE TODO error handler } fieldID, err := strconv.ParseInt(blob.Key, 10, 32) if err != nil { log.Println("string to fieldID wrong") // GOOSE TODO error handler } // Append binlogs inBinlogMsg := &insertFlushSyncMsg{ flushCompleted: false, insertBinlogPathMsg: insertBinlogPathMsg{ ts: flushTs, segID: currentSegID, fieldID: fieldID, paths: []string{key}, }, } ibNode.outCh <- inBinlogMsg } } // Flushed log.Println(".. Flush finished ...") inBinlogMsg := &insertFlushSyncMsg{ flushCompleted: true, insertBinlogPathMsg: insertBinlogPathMsg{ ts: flushTs, segID: currentSegID, }, } ibNode.outCh <- inBinlogMsg } if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil { log.Printf("Error: send hard time tick into pulsar channel failed, %s\n", err.Error()) } return nil } func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (*schemapb.CollectionSchema, error) { ret, err := ibNode.replica.getCollectionByID(collectionID) if err != nil { return nil, err } return ret.schema, nil } func (ibNode *insertBufferNode) getCollectionSchemaByName(collectionName string) (*schemapb.CollectionSchema, error) { ret, err := ibNode.replica.getCollectionByName(collectionName) if err != nil { return nil, err } return ret.schema, nil } func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { msgPack := msgstream.MsgPack{} timeTickMsg := msgstream.TimeTickMsg{ BaseMsg: msgstream.BaseMsg{ BeginTimestamp: ts, EndTimestamp: ts, HashValues: []uint32{0}, }, TimeTickMsg: internalpb.TimeTickMsg{ MsgType: internalpb.MsgType_kTimeTick, PeerID: Params.WriteNodeID, Timestamp: ts, }, } msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg) return ibNode.pulsarWriteNodeTimeTickStream.Produce(&msgPack) } func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg, replica collectionReplica) *insertBufferNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) maxSize := Params.FlushInsertBufSize iBuffer := &insertBuffer{ insertData: make(map[UniqueID]*InsertData), maxSize: maxSize, } // MinIO option := &miniokv.Option{ Address: Params.MinioAddress, AccessKeyID: Params.MinioAccessKeyID, SecretAccessKeyID: Params.MinioSecretAccessKey, UseSSL: Params.MinioUseSSL, CreateBucket: true, BucketName: Params.MinioBucketName, } minIOKV, err := miniokv.NewMinIOKV(ctx, option) if err != nil { panic(err) } minioPrefix := Params.InsertLogRootPath idAllocator, err := allocator.NewIDAllocator(ctx, Params.MasterAddress) if err != nil { panic(err) } err = idAllocator.Start() if err != nil { panic(err) } wTt := msgstream.NewPulsarMsgStream(ctx, 1024) //input stream, write node time tick wTt.SetPulsarClient(Params.PulsarAddress) wTt.CreatePulsarProducers([]string{Params.WriteNodeTimeTickChannelName}) return &insertBufferNode{ BaseNode: baseNode, insertBuffer: iBuffer, minIOKV: minIOKV, minioPrifex: minioPrefix, idAllocator: idAllocator, outCh: outCh, pulsarWriteNodeTimeTickStream: wTt, replica: replica, } }