diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index c39a60a678..93a44a3782 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -611,7 +611,7 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, } // set pulsar info to tsMsg tsMsg.SetPosition(&msgstream.MsgPosition{ - ChannelName: pulsarMsg.Topic(), + ChannelName: filepath.Base(pulsarMsg.Topic()), MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), }) diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index a2a8d967ca..5c369fc2df 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -140,6 +140,8 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { partitionName := msg.PartitionName err := ddNode.replica.addPartition2(collectionID, partitionName) + // TODO:: add partition by partitionID + //err := ddNode.replica.addPartition(collectionID, msg.PartitionID) if err != nil { log.Println(err) return diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index edc553e1fc..c6b85d1941 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -474,7 +474,9 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S } return status, err } - return nil, nil + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, nil } func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) { @@ -500,5 +502,7 @@ func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*comm return status, err } } - return nil, nil + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, nil } diff --git a/internal/queryservice/load_test.go b/internal/queryservice/load_test.go new file mode 100644 index 0000000000..9d73d51682 --- /dev/null +++ b/internal/queryservice/load_test.go @@ -0,0 +1,443 @@ +package queryservice + +import ( + "context" + "encoding/binary" + "math" + "strconv" + "testing" + + "github.com/golang/protobuf/proto" + + minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" +) + +//generate insert data +const msgLength = 100 +const receiveBufSize = 1024 +const pulsarBufSize = 1024 +const DIM = 16 + +func genInsert(collectionID int64, partitionID int64, segmentID int64, timeStart int) (*msgstream.MsgPack, *msgstream.MsgPack) { + msgs := make([]msgstream.TsMsg, 0) + for n := timeStart; n < timeStart+msgLength; n++ { + rowData := make([]byte, 0) + id := make([]byte, 8) + binary.BigEndian.PutUint64(id, uint64(n)) + rowData = append(rowData, id...) + time := make([]byte, 8) + binary.BigEndian.PutUint64(time, uint64(n)) + rowData = append(rowData, time...) + for i := 0; i < DIM; i++ { + vec := make([]byte, 4) + binary.BigEndian.PutUint32(vec, math.Float32bits(float32(n*i))) + rowData = append(rowData, vec...) + } + age := make([]byte, 4) + binary.BigEndian.PutUint32(age, 1) + rowData = append(rowData, age...) + blob := &commonpb.Blob{ + Value: rowData, + } + + var insertMsg msgstream.TsMsg = &msgstream.InsertMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{uint32(n)}, + }, + InsertRequest: internalpb2.InsertRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kInsert, + MsgID: 0, + Timestamp: uint64(n), + SourceID: 0, + }, + CollectionID: collectionID, + PartitionID: partitionID, + SegmentID: segmentID, + ChannelID: "0", + Timestamps: []uint64{uint64(n)}, + RowIDs: []int64{int64(n)}, + RowData: []*commonpb.Blob{blob}, + }, + } + msgs = append(msgs, insertMsg) + } + + insertMsgPack := &msgstream.MsgPack{ + BeginTs: uint64(timeStart), + EndTs: uint64(timeStart + msgLength), + Msgs: msgs, + } + + // generate timeTick + timeTickMsg := &msgstream.TimeTickMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []uint32{0}, + }, + TimeTickMsg: internalpb2.TimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kTimeTick, + MsgID: 0, + Timestamp: uint64(timeStart + msgLength), + SourceID: 0, + }, + }, + } + timeTickMsgPack := &msgstream.MsgPack{ + Msgs: []msgstream.TsMsg{timeTickMsg}, + } + return insertMsgPack, timeTickMsgPack +} + +func genSchema(collectionID int64) *schemapb.CollectionSchema { + fieldID := schemapb.FieldSchema{ + FieldID: UniqueID(0), + Name: "RowID", + IsPrimaryKey: false, + DataType: schemapb.DataType_INT64, + } + + fieldTime := schemapb.FieldSchema{ + FieldID: UniqueID(1), + Name: "Timestamp", + IsPrimaryKey: false, + DataType: schemapb.DataType_INT64, + } + + fieldVec := schemapb.FieldSchema{ + FieldID: UniqueID(100), + Name: "vec", + IsPrimaryKey: false, + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "metric_type", + Value: "L2", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + FieldID: UniqueID(101), + Name: "age", + IsPrimaryKey: false, + DataType: schemapb.DataType_INT32, + } + + return &schemapb.CollectionSchema{ + Name: "collection-" + strconv.FormatInt(collectionID, 10), + AutoID: true, + Fields: []*schemapb.FieldSchema{ + &fieldID, &fieldTime, &fieldVec, &fieldInt, + }, + } +} + +func genCreateCollection(collectionID int64) *msgstream.MsgPack { + schema := genSchema(collectionID) + + byteSchema, err := proto.Marshal(schema) + if err != nil { + panic(err) + } + + request := internalpb2.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kCreateCollection, + Timestamp: uint64(10), + }, + DbID: 0, + CollectionID: collectionID, + Schema: byteSchema, + } + + msg := &msgstream.CreateCollectionMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []uint32{0}, + }, + CreateCollectionRequest: request, + } + + return &msgstream.MsgPack{ + Msgs: []msgstream.TsMsg{msg}, + } +} + +func genCreatePartition(collectionID int64, partitionID int64) *msgstream.MsgPack { + request := internalpb2.CreatePartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kCreatePartition, + Timestamp: uint64(20), + }, + DbID: 0, + CollectionID: collectionID, + PartitionID: partitionID, + } + + msg := &msgstream.CreatePartitionMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []uint32{0}, + }, + CreatePartitionRequest: request, + } + return &msgstream.MsgPack{ + Msgs: []msgstream.TsMsg{msg}, + } +} + +func getMinioKV(ctx context.Context) (*minioKV.MinIOKV, error) { + minioAddress := "localhost:9000" + accessKeyID := "minioadmin" + secretAccessKey := "minioadmin" + useSSL := false + bucketName := "a-bucket" + + option := &minioKV.Option{ + Address: minioAddress, + AccessKeyID: accessKeyID, + SecretAccessKeyID: secretAccessKey, + UseSSL: useSSL, + BucketName: bucketName, + CreateBucket: true, + } + + return minioKV.NewMinIOKV(ctx, option) +} + +func TestLoadCollection(t *testing.T) { + //// produce msg + //insertChannels := []string{"insert-0"} + //ddChannels := []string{"data-definition-0"} + //pulsarAddress := "pulsar://127.0.0.1:6650" + // + //insertStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize) + //insertStream.SetPulsarClient(pulsarAddress) + //insertStream.CreatePulsarProducers(insertChannels) + //ddStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize) + //ddStream.SetPulsarClient(pulsarAddress) + //ddStream.CreatePulsarProducers(ddChannels) + // + //var insertMsgStream msgstream.MsgStream = insertStream + //insertMsgStream.Start() + //var ddMsgStream msgstream.MsgStream = ddStream + //ddMsgStream.Start() + // + //createCollectionMsgPack := genCreateCollection(1) + //createPartitionMsgPack := genCreatePartition(1, 1) + //ddMsgStream.Produce(createCollectionMsgPack) + //ddMsgStream.Produce(createPartitionMsgPack) + // + //consumeStream := pulsarms.NewPulsarTtMsgStream(context.Background(), receiveBufSize) + //consumeStream.SetPulsarClient(pulsarAddress) + //unmarshalDispatcher := util.NewUnmarshalDispatcher() + //consumeStream.CreatePulsarConsumers(insertChannels, "test", unmarshalDispatcher, pulsarBufSize) + //consumeStream.Start() + // + //for i := 0; i < 10; i++ { + // insertMsgPack, timeTickMsgPack := genInsert(1, 1, int64(i), i*msgLength+1) + // err := insertMsgStream.Produce(insertMsgPack) + // assert.NoError(t, err) + // err = insertMsgStream.Broadcast(timeTickMsgPack) + // assert.NoError(t, err) + // err = ddMsgStream.Broadcast(timeTickMsgPack) + // assert.NoError(t, err) + //} + // + ////consume msg + //segPosition := make(map[int64][]*internalpb2.MsgPosition) + //segmentData := make([]*storage.InsertData, 0) + //indexRowDatas := make([][]float32, 0) + //for i := 0; i < 10; i++ { + // msgPack := consumeStream.Consume() + // idData := make([]int64, 0) + // timestamps := make([]int64, 0) + // fieldAgeData := make([]int32, 0) + // fieldVecData := make([]float32, 0) + // for n := 0; n < msgLength; n++ { + // blob := msgPack.Msgs[n].(*msgstream.InsertMsg).RowData[0].Value + // id := binary.BigEndian.Uint64(blob[0:8]) + // idData = append(idData, int64(id)) + // time := binary.BigEndian.Uint64(blob[8:16]) + // timestamps = append(timestamps, int64(time)) + // for i := 0; i < DIM; i++ { + // bits := binary.BigEndian.Uint32(blob[16+4*i : 16+4*(i+1)]) + // floatVec := math.Float32frombits(bits) + // fieldVecData = append(fieldVecData, floatVec) + // } + // ageValue := binary.BigEndian.Uint32(blob[80:84]) + // fieldAgeData = append(fieldAgeData, int32(ageValue)) + // } + // + // insertData := &storage.InsertData{ + // Data: map[int64]storage.FieldData{ + // 0: &storage.Int64FieldData{ + // NumRows: msgLength, + // Data: idData, + // }, + // 1: &storage.Int64FieldData{ + // NumRows: msgLength, + // Data: timestamps, + // }, + // 100: &storage.FloatVectorFieldData{ + // NumRows: msgLength, + // Data: fieldVecData, + // Dim: DIM, + // }, + // 101: &storage.Int32FieldData{ + // NumRows: msgLength, + // Data: fieldAgeData, + // }, + // }, + // } + // segPosition[int64(i)] = msgPack.StartPositions + // segmentData = append(segmentData, insertData) + // indexRowDatas = append(indexRowDatas, fieldVecData) + //} + // + ////gen inCodec + //collectionMeta := &etcdpb.CollectionMeta{ + // ID: 1, + // Schema: genSchema(1), + // CreateTime: 0, + // PartitionIDs: []int64{1}, + // SegmentIDs: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + //} + //inCodec := storage.NewInsertCodec(collectionMeta) + // + //// get minio client + //minioKV, err := getMinioKV(context.Background()) + //assert.Nil(t, err) + // + //// write binlog minio + //collectionStr := strconv.FormatInt(1, 10) + //for i := 0; i < 9; i++ { + // binLogs, err := inCodec.Serialize(1, storage.UniqueID(i), segmentData[i]) + // assert.Nil(t, err) + // assert.Equal(t, len(binLogs), 4) + // keyPrefix := "distributed-query-test-binlog" + // segmentStr := strconv.FormatInt(int64(i), 10) + // + // for _, blob := range binLogs { + // key := path.Join(keyPrefix, collectionStr, segmentStr, blob.Key) + // err = minioKV.Save(key, string(blob.Value[:])) + // assert.Nil(t, err) + // } + //} + // + //// gen index build's indexParams + //indexParams := make(map[string]string) + //indexParams["index_type"] = "IVF_PQ" + //indexParams["index_mode"] = "cpu" + //indexParams["dim"] = "16" + //indexParams["k"] = "10" + //indexParams["nlist"] = "100" + //indexParams["nprobe"] = "10" + //indexParams["m"] = "4" + //indexParams["nbits"] = "8" + //indexParams["metric_type"] = "L2" + //indexParams["SLICE_SIZE"] = "400" + // + //var indexParamsKV []*commonpb.KeyValuePair + //for key, value := range indexParams { + // indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ + // Key: key, + // Value: value, + // }) + //} + // + //// generator index and write index to minio + //for i := 0; i < 9; i++ { + // typeParams := make(map[string]string) + // typeParams["dim"] = "16" + // index, err := indexnode.NewCIndex(typeParams, indexParams) + // assert.Nil(t, err) + // err = index.BuildFloatVecIndexWithoutIds(indexRowDatas[i]) + // assert.Equal(t, err, nil) + // binarySet, err := index.Serialize() + // assert.Equal(t, len(binarySet), 1) + // assert.Nil(t, err) + // keyPrefix := "distributed-query-test-index" + // segmentStr := strconv.FormatInt(int64(i), 10) + // indexStr := strconv.FormatInt(int64(i), 10) + // key := path.Join(keyPrefix, collectionStr, segmentStr, indexStr) + // minioKV.Save(key, string(binarySet[0].Value)) + //} + // + ////generate query service + //service, err := NewQueryService(context.Background()) + //assert.Nil(t, err) + //collectionID := UniqueID(1) + //partitions := []UniqueID{1} + //col2partition := make(map[UniqueID][]UniqueID) + //col2partition[collectionID] = partitions + //segments := []UniqueID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + //partition2segment := make(map[UniqueID][]UniqueID) + //partition2segment[UniqueID(1)] = segments + //masterMock := &masterMock{ + // collectionIDs: []UniqueID{1}, + // col2partition: col2partition, + // partition2segment: partition2segment, + //} + //service.SetMasterService(masterMock) + //segStates := make(map[UniqueID]*datapb.SegmentStatesResponse) + //for i := 0; i < 10; i++ { + // if i != 9 { + // state := &datapb.SegmentStatesResponse{ + // State: datapb.SegmentState_SegmentFlushed, + // StartPositions: segPosition[int64(i)], + // } + // segStates[UniqueID(i)] = state + // } else { + // state := &datapb.SegmentStatesResponse{ + // State: datapb.SegmentState_SegmentGrowing, + // StartPositions: segPosition[int64(i)], + // } + // segStates[UniqueID(i)] = state + // } + //} + //dataMock := &dataMock{ + // segmentIDs: []UniqueID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + // segmentStates: segStates, + //} + // + //service.SetDataService(dataMock) + //service.SetEnableGrpc(true) + // + //loadCollectionRequest := &querypb.LoadCollectionRequest{ + // Base: &commonpb.MsgBase{ + // MsgType: commonpb.MsgType_kCreateCollection, + // }, + // DbID: UniqueID(0), + // CollectionID: collectionID, + //} + // + //registerRequest := &querypb.RegisterNodeRequest{ + // Address: &commonpb.Address{ + // Ip: "localhost", + // Port: 20010, + // }, + //} + //response, err := service.RegisterNode(registerRequest) + //assert.Nil(t, err) + //assert.Equal(t, response.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) + // + //status, err := service.LoadCollection(loadCollectionRequest) + //assert.Nil(t, err) + //assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS) +} diff --git a/internal/queryservice/meta_replica.go b/internal/queryservice/meta_replica.go index 0e2cb9fa72..2d165ad460 100644 --- a/internal/queryservice/meta_replica.go +++ b/internal/queryservice/meta_replica.go @@ -1,20 +1,17 @@ package queryservice import ( - "log" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) type metaReplica interface { - getCollectionIDs(dbID UniqueID) ([]UniqueID, error) - getPartitionIDs(dbID UniqueID, collectionID UniqueID) ([]UniqueID, error) - getCollection(dbID UniqueID, collectionID UniqueID) *collection - getSegmentIDs(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]UniqueID, error) - loadCollection(dbID UniqueID, collectionID UniqueID) - loadPartitions(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) - updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) + getCollections(dbID UniqueID) ([]*collection, error) + getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) + getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) + loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) + loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) + updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) error getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error) } @@ -50,150 +47,137 @@ func newMetaReplica() metaReplica { } } -func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) { - partitions := make(map[UniqueID]*partition) - node2channel := make(map[int][]string) - newCollection := &collection{ - id: collectionID, - partitions: partitions, - node2channel: node2channel, - } - mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection) -} - -func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) { - collections := mp.db2collections[dbID] - for _, collection := range collections { - if collection.id == collectionID { - partitions := collection.partitions - segments := make(map[UniqueID]*segment) - partitions[partitionID] = &partition{ - id: partitionID, - state: querypb.PartitionState_NotPresent, - segments: segments, - } - return +func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) { + if _, ok := mp.db2collections[dbID]; ok { + partitions := make(map[UniqueID]*partition) + node2channel := make(map[int][]string) + newCollection := &collection{ + id: collectionID, + partitions: partitions, + node2channel: node2channel, } + mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection) + return newCollection, nil } - log.Fatal("can't find collection when add partition") + return nil, errors.New("can't find dbID when add collection") } -func (mp *metaReplicaImpl) getCollection(dbID UniqueID, collectionID UniqueID) *collection { - for _, id := range mp.dbID { - if id == dbID { - collections := mp.db2collections[id] - for _, collection := range collections { - if collection.id == collectionID { - return collection - } - } - return nil - } - } - return nil -} - -func (mp *metaReplicaImpl) getCollectionIDs(dbID UniqueID) ([]UniqueID, error) { +func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) { if collections, ok := mp.db2collections[dbID]; ok { - collectionIDs := make([]UniqueID, 0) for _, collection := range collections { - collectionIDs = append(collectionIDs, collection.id) + if collection.id == collectionID { + partitions := collection.partitions + segments := make(map[UniqueID]*segment) + partition := &partition{ + id: partitionID, + state: querypb.PartitionState_NotPresent, + segments: segments, + } + partitions[partitionID] = partition + return partition, nil + } } - return collectionIDs, nil } - - return nil, errors.New("can't find collection in queryService") + return nil, errors.New("can't find collection when add partition") } -func (mp *metaReplicaImpl) getPartitionIDs(dbID UniqueID, collectionID UniqueID) ([]UniqueID, error) { +func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) { + if collections, ok := mp.db2collections[dbID]; ok { + return collections, nil + } + + return nil, errors.New("can't find collectionID") +} + +func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { - partitions := collection.partitions - partitionIDs := make([]UniqueID, 0) - for _, partition := range partitions { - partitionIDs = append(partitionIDs, partition.id) + partitions := make([]*partition, 0) + for _, partition := range collection.partitions { + partitions = append(partitions, partition) } - return partitionIDs, nil + return partitions, nil } } } - return nil, errors.New("can't find partitions in queryService") + return nil, errors.New("can't find partitionIDs") } -func (mp *metaReplicaImpl) getSegmentIDs(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]UniqueID, error) { - segmentIDs := make([]UniqueID, 0) +func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { if partition, ok := collection.partitions[partitionID]; ok { + segments := make([]*segment, 0) for _, segment := range partition.segments { - segmentIDs = append(segmentIDs, segment.id) + segments = append(segments, segment) } + return segments, nil } } } } - return segmentIDs, nil + return nil, errors.New("can't find segmentID") } -func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) { - collectionIDs, err := mp.getCollectionIDs(dbID) - if err != nil { - mp.addCollection(dbID, collectionID) - return - } - for _, id := range collectionIDs { - if collectionID == id { - return +func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) { + var res *collection = nil + if collections, err := mp.getCollections(dbID); err == nil { + for _, collection := range collections { + if collectionID == collection.id { + return res, nil + } + } + } else { + res, err = mp.addCollection(dbID, collectionID) + if err != nil { + return nil, err } } - mp.addCollection(dbID, collectionID) + return res, nil } -func (mp *metaReplicaImpl) loadPartitions(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) { +func (mp *metaReplicaImpl) loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) { var collection *collection = nil + var partition *partition = nil + var err error for _, col := range mp.db2collections[dbID] { if col.id == collectionID { collection = col } } if collection == nil { - mp.addCollection(dbID, collectionID) - for _, col := range mp.db2collections[dbID] { - if col.id == collectionID { - collection = col - } + collection, err = mp.addCollection(dbID, collectionID) + if err != nil { + return partition, err } } - for _, partitionID := range partitionIDs { - match := false - for _, partition := range collection.partitions { - if partition.id == partitionID { - match = true - continue - } - } - if !match { - mp.addPartition(dbID, collectionID, partitionID) + if _, ok := collection.partitions[partitionID]; !ok { + partition, err = mp.addPartition(dbID, collectionID, partitionID) + if err != nil { + return partition, err } + return partition, nil } + + return nil, nil } func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, - state querypb.PartitionState) { + state querypb.PartitionState) error { for _, collection := range mp.db2collections[dbID] { if collection.id == collectionID { if partition, ok := collection.partitions[partitionID]; ok { partition.state = state - return + return nil } } } - log.Fatal("update partition state fail") + return errors.New("update partition state fail") } func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID, diff --git a/internal/queryservice/querynode.go b/internal/queryservice/querynode.go index 776636ee63..1832b2eab2 100644 --- a/internal/queryservice/querynode.go +++ b/internal/queryservice/querynode.go @@ -6,17 +6,18 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) -type queryNode struct { +type queryNodeInfo struct { client QueryNodeInterface insertChannels string nodeID uint64 segments []UniqueID + dmChannelNames []string } -func (qn *queryNode) GetComponentStates() (*internalpb2.ComponentStates, error) { +func (qn *queryNodeInfo) GetComponentStates() (*internalpb2.ComponentStates, error) { return qn.client.GetComponentStates() } -func (qn *queryNode) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error) { +func (qn *queryNodeInfo) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error) { return qn.client.LoadSegments(in) } diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index c7a5c85af9..59f1d845e6 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -46,10 +46,9 @@ type QueryService struct { dataServiceClient DataServiceInterface masterServiceClient MasterServiceInterface - queryNodes []*queryNode - //TODO:: nodeID use UniqueID - numRegisterNode uint64 - numQueryChannel uint64 + queryNodes []*queryNodeInfo + numRegisterNode uint64 + numQueryChannel uint64 stateCode atomic.Value isInit atomic.Value @@ -115,31 +114,26 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) { func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { fmt.Println("register query node =", req.Address) // TODO:: add mutex - allocatedID := qs.numRegisterNode - qs.numRegisterNode++ - - if allocatedID > Params.QueryNodeNum { - log.Fatal("allocated queryNodeID should lower than Params.QueryNodeNum") - } + allocatedID := uint64(len(qs.queryNodes)) registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10) - var node *queryNode + var node *queryNodeInfo if qs.enableGrpc { client := nodeclient.NewClient(registerNodeAddress) - node = &queryNode{ + node = &queryNodeInfo{ client: client, nodeID: allocatedID, } } else { client := querynode.NewQueryNode(qs.loopCtx, allocatedID) - node = &queryNode{ + node = &queryNodeInfo{ client: client, nodeID: allocatedID, } } qs.queryNodes = append(qs.queryNodes, node) - // TODO:: watch dm channels + //TODO::return init params to queryNode return &querypb.RegisterNodeResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -152,9 +146,18 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) { dbID := req.DbID - collectionIDs, err := qs.replica.getCollectionIDs(dbID) + collections, err := qs.replica.getCollections(dbID) + collectionIDs := make([]UniqueID, 0) + for _, collection := range collections { + collectionIDs = append(collectionIDs, collection.id) + } if err != nil { - return nil, err + return &querypb.ShowCollectionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, + }, err } return &querypb.ShowCollectionResponse{ Status: &commonpb.Status{ @@ -167,13 +170,23 @@ func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*qu func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*commonpb.Status, error) { dbID := req.DbID collectionID := req.CollectionID - qs.replica.loadCollection(dbID, collectionID) - fn := func(err error) *commonpb.Status { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } } + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } + } + collection, err := qs.replica.loadCollection(dbID, collectionID) + if err != nil { + return fn(err), err + } + if collection == nil { + return fn(nil), nil } // get partitionIDs @@ -189,7 +202,7 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com return fn(err), err } if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Fatal("show partition fail, v%", showPartitionResponse.Status.Reason) + return showPartitionResponse.Status, err } partitionIDs := showPartitionResponse.PartitionIDs @@ -208,15 +221,24 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { dbID := req.DbID collectionID := req.CollectionID - partitionsIDs, err := qs.replica.getPartitionIDs(dbID, collectionID) + partitions, err := qs.replica.getPartitions(dbID, collectionID) if err != nil { - log.Fatal("get partition ids error") + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, err } + + partitionIDs := make([]UniqueID, 0) + for _, partition := range partitions { + partitionIDs = append(partitionIDs, partition.id) + } + releasePartitionRequest := &querypb.ReleasePartitionRequest{ Base: req.Base, DbID: dbID, CollectionID: collectionID, - PartitionIDs: partitionsIDs, + PartitionIDs: partitionIDs, } status, err := qs.ReleasePartitions(releasePartitionRequest) @@ -227,9 +249,18 @@ func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest) func (qs *QueryService) ShowPartitions(req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error) { dbID := req.DbID collectionID := req.CollectionID - partitionIDs, err := qs.replica.getPartitionIDs(dbID, collectionID) + partitions, err := qs.replica.getPartitions(dbID, collectionID) + partitionIDs := make([]UniqueID, 0) + for _, partition := range partitions { + partitionIDs = append(partitionIDs, partition.id) + } if err != nil { - return nil, err + return &querypb.ShowPartitionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, + }, err } return &querypb.ShowPartitionResponse{ Status: &commonpb.Status{ @@ -237,14 +268,14 @@ func (qs *QueryService) ShowPartitions(req *querypb.ShowPartitionRequest) (*quer }, PartitionIDs: partitionIDs, }, nil - } func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*commonpb.Status, error) { + //TODO::suggest different partitions have different dm channel dbID := req.DbID collectionID := req.CollectionID partitionIDs := req.PartitionIDs - qs.replica.loadPartitions(dbID, collectionID, partitionIDs) + qs.replica.loadPartition(dbID, collectionID, partitionIDs[0]) fn := func(err error) *commonpb.Status { return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -333,6 +364,9 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm for key, node := range qs.queryNodes { if channels == node.insertChannels { statesID := id2segs[i][len(id2segs[i])-1] + //TODO :: should be start position + position := segmentStates[statesID-1].StartPositions + segmentStates[statesID].StartPositions = position loadSegmentRequest := &querypb.LoadSegmentRequest{ CollectionID: collectionID, PartitionID: partitionID, @@ -359,10 +393,18 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest) partitionIDs := req.PartitionIDs segmentIDs := make([]UniqueID, 0) for _, partitionID := range partitionIDs { - res, err := qs.replica.getSegmentIDs(dbID, collectionID, partitionID) + segments, err := qs.replica.getSegments(dbID, collectionID, partitionID) if err != nil { - log.Fatal("get segment ids error") + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, err } + res := make([]UniqueID, 0) + for _, segment := range segments { + res = append(res, segment.id) + } + segmentIDs = append(segmentIDs, res...) } releaseSegmentRequest := &querypb.ReleaseSegmentRequest{ @@ -421,7 +463,7 @@ func (qs *QueryService) GetPartitionStates(req *querypb.PartitionStatesRequest) } func NewQueryService(ctx context.Context) (*QueryService, error) { - nodes := make([]*queryNode, 0) + nodes := make([]*queryNodeInfo, 0) ctx1, cancel := context.WithCancel(ctx) replica := newMetaReplica() service := &QueryService{ diff --git a/internal/util/typeutil/convension.go b/internal/util/typeutil/convension.go index 68d6e2da1d..4713184e64 100644 --- a/internal/util/typeutil/convension.go +++ b/internal/util/typeutil/convension.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "reflect" + "strings" "github.com/apache/pulsar-client-go/pulsar" @@ -43,7 +44,7 @@ func Uint64ToBytes(v uint64) []byte { } func PulsarMsgIDToString(messageID pulsar.MessageID) string { - return string(messageID.Serialize()) + return strings.ToValidUTF8(string(messageID.Serialize()), "") } func StringToPulsarMsgID(msgString string) (pulsar.MessageID, error) {