From 1cfc6c7f86302528d9346b07fd1d80a447dce8eb Mon Sep 17 00:00:00 2001 From: GuoRentong Date: Wed, 28 Oct 2020 10:09:35 +0800 Subject: [PATCH] Add docs Signed-off-by: GuoRentong --- docs/developer_guides/developer_guides.md | 438 +++++++++++++++------- internal/proto/etcd_meta.proto | 3 +- internal/proto/schema.proto | 2 +- internal/proxy/manipulation_req.go | 32 +- internal/proxy/server.go | 9 +- 5 files changed, 325 insertions(+), 159 deletions(-) diff --git a/docs/developer_guides/developer_guides.md b/docs/developer_guides/developer_guides.md index 6f6354dcea..7f76b28eb2 100644 --- a/docs/developer_guides/developer_guides.md +++ b/docs/developer_guides/developer_guides.md @@ -79,6 +79,8 @@ For better throughput, Milvus allows asynchronous state synchronization between ``` go type CollectionSchema struct { Name string + Description string + AutoId bool Fields []FieldSchema } ``` @@ -87,7 +89,6 @@ type CollectionSchema struct { ``` go type FieldSchema struct { - Id uint64 Name string Description string DataType DataType @@ -96,33 +97,36 @@ type FieldSchema struct { } ``` +###### 2.2.1 Data Types +###### 2.2.2 Type Params -#### 2.3 Type Params - -#### 2.4 Index Params +###### 2.2.3 Index Params ## 3. Request +In this section, we introduce the RPCs of milvus service. A brief description of the RPCs is listed as follows. - -#### 3.1 Base Request - -``` go -type BaseRequest interface { - Type() ReqType - PreExecute() Status - Execute() Status - PostExecute() Status - WaitToFinish() Status -} -``` +| RPC | description | +| :----------------- | ------------------------------------------------------------ | +| CreateCollection | create a collection base on schema statement | +| DropCollection | drop a collection | +| HasCollection | whether or not a collection exists | +| DescribeCollection | show a collection's schema and its descriptive statistics | +| ShowCollections | list all collections | +| CreatePartition | create a partition | +| DropPartition | drop a partition | +| HasPartition | whether or not a partition exists | +| DescribePartition | show a partition's name and its descriptive statistics | +| ShowPartitions | list a collection's all partitions | +| Insert | insert a batch of rows into a collection or a partition | +| Search | query the columns of a collection or a partition with ANNS statements and boolean expressions | -#### 3.2 Definition Requests +#### 3.1 Definition Requests ###### 3.2.1 Collection @@ -140,91 +144,21 @@ type BaseRequest interface { * DescribePartition * ShowPartitions -###### 3.2.3 Index - -* CreateIndex -* DropIndex -* DescribeIndex - -###### 3.2.4 Definition Request & Task - -```go -type DDRequest struct { - CollectionName string - PartitionName string - SegmentId uint64 - ChannelId uint64 - - PrimaryKeys []uint64 - RowData []*RowDataBlob - - reqType ReqType - ts Timestamp -} - -type DDTask struct { - DDRequest -} - -// TsMsg interfaces -func (req *DDTask) Ts() Timestamp -func (req *DDTask) SetTs(ts Timestamp) - -// BaseRequest interfaces -func (req *DDTask) Type() ReqType -func (req *DDTask) PreExecute() Status -func (req *DDTask) Execute() Status -func (req *DDTask) PostExecute() Status -func (req *DDTask) WaitToFinish() Status -``` +#### 3.2 Manipulation Requsts -#### 3.3 Manipulation Requsts - -###### 3.3.1 Insert +###### 3.2.1 Insert * Insert -###### 3.3.2 Delete +###### 3.2.2 Delete * DeleteByID -###### 3.3.3 Manipulation Requst - -```go -type DMRequest struct { - CollectionName string - PartitionTag string - SegmentId uint64 - ChannelId uint64 - - PrimaryKeys []uint64 - RowData []*RowDataBlob - - reqType ReqType - ts Timestamp -} - -type DMTask struct { - DMRequest -} - -// TsMsg interfaces -func (req *DMTask) Ts() Timestamp -func (req *DMTask) SetTs(ts Timestamp) - -// BaseRequest interfaces -func (req *DMTask) Type() ReqType -func (req *DMTask) PreExecute() Status -func (req *DMTask) Execute() Status -func (req *DMTask) PostExecute() Status -func (req *DMTask) WaitToFinish() Status -``` - -#### 3.5 Query +#### 3.3 Query @@ -287,11 +221,11 @@ func (tso *timestampOracle) loadTimestamp() Status -#### 4.3 Batch Allocation of Timestamps +#### 4.2 Timestamp Allocator +###### 4.2.1 Batch Allocation of Timestamps - -#### 4.4 Expiration of Timestamps +###### 4.2.2 Expiration of Timestamps @@ -351,56 +285,167 @@ func (gparams *GlobalParamsTable) Remove(key string) Status +#### 5.3 Message Stream + +``` go +type MsgType uint32 +const { + USER_REQUEST MsgType = 1 + TIME_TICK = 2 +} + +type TsMsg interface { + SetTs(ts Timestamp) + Ts() Timestamp + Type() MsgType +} + +type TsMsgMarshaler interface { + Marshal(input *TsMsg) ([]byte, Status) + Unmarshal(input []byte) (*TsMsg, Status) +} + +type MsgPack struct { + BeginTs Timestamp + EndTs Timestamp + Msgs []*TsMsg +} + +type MsgStream interface { + SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) + Produce(*MsgPack) Status + Consume() *MsgPack // message can be consumed exactly once +} + +type PulsarMsgStream struct { + client *pulsar.Client + produceChannels []string + consumeChannels []string + + msgMarshaler *TsMsgMarshaler + msgUnmarshaler *TsMsgMarshaler +} + +func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) +func (ms *PulsarMsgStream) Produce(*MsgPack) Status +func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick + +type PulsarTtMsgStream struct { + client *pulsar.Client + produceChannels []string + consumeChannels []string + + msgMarshaler *TsMsgMarshaler + msgUnmarshaler *TsMsgMarshaler + inputBuf []*TsMsg + unsolvedBuf []*TsMsg + msgPacks []*MsgPack +} + +func (ms *PulsarTtMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) +func (ms *PulsarTtMsgStream) Produce(*MsgPack) Status +func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time tick +``` + + + ## 6. Proxy #### 6.1 Overview -#### 6.2 Request Scheduler +#### 3.1 Task ``` go -type requestScheduler struct { - definitions requestQueue - manipulations requestQueue - queries requestQueue -} - -func (rs *requestScheduler) ExecuteRequest(req *Request) Status - -func (rs *requestScheduler) staticValidityCheck(req *Request) Status -func (rs *requestScheduler) setTimestamp(req *Request) -func (rs *requestScheduler) setPrimaryKey(req *Request) -func (rs *requestScheduler) setSegmentId(req *Request) -func (rs *requestScheduler) setProxyId(req *Request) - -// @param selection -// bit_0 = 1: select definition queue -// bit_1 = 1: select manipulation queue -// bit_2 = 1: select query queue -// example: if mode = 3, then both definition and manipulation queues are selected -func (rs *requestScheduler) AreRequestsDelivered(ts Timestamp, selection uint32) bool - -// ActiveComponent interfaces -func (rs *requestScheduler) Id() String -func (rs *requestScheduler) Status() Status -func (rs *requestScheduler) Clean() Status -func (rs *requestScheduler) Restart() Status -func (rs *requestScheduler) heartbeat() - -// protobuf -message ReqSchedulerHeartbeat { - string id - uint64 definition_queue_length - uint64 manipulation_queue_length - uint64 query_queue_length - uint64 num_delivered_definitions - uint64 num_delivered_manipulations - uint64 num_delivered_queries +type task interface { + PreExecute() Status + Execute() Status + PostExecute() Status + WaitToFinish() Status + Notify() Status } ``` +* Base Task +```go +type baseTask struct { + Type ReqType + ReqId int64 + Ts Timestamp + ProxyId int64 +} + +func (task *baseTask) PreExecute() Status +func (task *baseTask) Execute() Status +func (task *baseTask) PostExecute() Status +func (task *baseTask) WaitToFinish() Status +func (task *baseTask) Notify() Status +``` + +* Insert Task + + Take insertTask as an example: + +```go +type insertTask struct { + baseTask + SegIdAssigner *segIdAssigner + RowIdAllocator *IdAllocator + rowBatch *RowBatch +} + +func (task *InsertTask) Execute() Status +func (task *InsertTask) WaitToFinish() Status +func (task *InsertTask) Notify() Status +``` + + + +#### 6.2 Task Scheduler + +``` go +type taskScheduler struct { + // definition tasks + ddTasks *task chan + // manipulation tasks + dmTasks *task chan + // query tasks + dqTasks *task chan + + tsAllocator *TimestampAllocator + ReqIdAllocator *IdAllocator +} + +func (sched *taskScheduler) EnqueueDDTask(task *task) Status +func (sched *taskScheduler) EnqueueDMTask(task *task) Status +func (sched *taskScheduler) EnqueueDQTask(task *task) Status + +func (sched *taskScheduler) TaskDoneTest(ts Timestamp) bool + +// ActiveComponent interfaces +func (sched *taskScheduler) Id() String +func (sched *taskScheduler) Status() Status +func (sched *taskScheduler) Clean() Status +func (sched *taskScheduler) Restart() Status +func (sched *taskScheduler) heartbeat() + +// protobuf +message taskSchedulerHeartbeat { + string id + uint64 dd_queue_length + uint64 dm_queue_length + uint64 dq_queue_length + uint64 num_dd_done + uint64 num_dm_done + uint64 num_dq_done +} +``` + +* EnqueueDMTask + + If a insertTask is enqueued, *EnqueueDDTask(task \*task)* will set *Ts*, *ReqId*, *ProxyId*, *SegIdAssigner*, *RowIdAllocator*, then push it into queue *dmTasks*. The *SegIdAssigner* and *RowIdAllocator* will later be used in the task's execution phase. #### 6.3 Time Tick @@ -459,9 +504,11 @@ type MsgStream interface { Consume() *MsgPack // message can be consumed exactly once } +type HashFunc func(*MsgPack) map[int32]*MsgPack + type PulsarMsgStream struct { client *pulsar.Client - msgHashFunc (*MsgPack) map[int32]*MsgPack // return a map from produceChannel idx to *MsgPack + msgHashFunc HashFunc // return a map from produceChannel idx to *MsgPack producers []*pulsar.Producer consumers []*pulsar.Consumer msgMarshaler *TsMsgMarshaler @@ -471,15 +518,15 @@ type PulsarMsgStream struct { func (ms *PulsarMsgStream) SetProducerChannels(channels []string) func (ms *PulsarMsgStream) SetConsumerChannels(channels []string) func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) -func (ms *PulsarMsgStream) SetMsgHashFunc(XXX) -func (ms *PulsarMsgStream) Produce(*MsgPack) Status +func (ms *PulsarMsgStream) SetMsgHashFunc(hashFunc *HashFunc) +func (ms *PulsarMsgStream) Produce(msgs *MsgPack) Status func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick type PulsarTtMsgStream struct { client *pulsar.Client - produceChannels []string - consumeChannels []string - + msgHashFunc (*MsgPack) map[int32]*MsgPack // return a map from produceChannel idx to *MsgPack + producers []*pulsar.Producer + consumers []*pulsar.Consumer msgMarshaler *TsMsgMarshaler msgUnmarshaler *TsMsgMarshaler inputBuf []*TsMsg @@ -487,9 +534,12 @@ type PulsarTtMsgStream struct { msgPacks []*MsgPack } -func (ms *PulsarTtMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) -func (ms *PulsarTtMsgStream) Produce(*MsgPack) Status -func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time tick +func (ms *PulsarMsgStream) SetProducerChannels(channels []string) +func (ms *PulsarMsgStream) SetConsumerChannels(channels []string) +func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) +func (ms *PulsarMsgStream) SetMsgHashFunc(hashFunc *HashFunc) +func (ms *PulsarMsgStream) Produce(msgs *MsgPack) Status +func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick ``` @@ -587,9 +637,113 @@ func (unmarshaler *QueryReqUnmarshaler) Unmarshal(input *pulsar.Message) (*TsMsg -#### 5.X Interfaces +#### 5.1 Interfaces (RPC) + +| RPC | description | +| :----------------- | ------------------------------------------------------------ | +| CreateCollection | create a collection base on schema statement | +| DropCollection | drop a collection | +| HasCollection | whether or not a collection exists | +| DescribeCollection | show a collection's schema and its descriptive statistics | +| ShowCollections | list all collections | +| CreatePartition | create a partition | +| DropPartition | drop a partition | +| HasPartition | whether or not a partition exists | +| DescribePartition | show a partition's name and its descriptive statistics | +| ShowPartitions | list a collection's all partitions | +| AllocTimestamp | allocate a batch of consecutive timestamps | +| AllocId | allocate a batch of consecutive IDs | +| AssignSegmentId | assign segment id to insert rows (master determines which segment these rows belong to) | +| | | +| | | +#### 5.2 Master Instance + +```go +type Master interface { + tso timestampOracle // timestamp oracle + ddScheduler ddRequestScheduler // data definition request scheduler + metaTable metaTable // in-memory system meta + collManager collectionManager // collection & partition manager + segManager segmentManager // segment manager +} +``` + +* Timestamp allocation + +Master serves as a centrol clock of the whole system. Other components (i.e. Proxy) allocates timestamps from master via RPC *AllocTimestamp*. All the timestamp allocation requests will be handled by the timestampOracle singleton. See section 4.2 for the details about timestampOracle. + +* Request Scheduling + +* System Meta + +* Collection Management + +* Segment Management + + + +#### 5.3 Data definition Request Scheduler + +###### 5.2.1 Task + +Master receives data definition requests via grpc. Each request (described by a proto) will be wrapped as a task for further scheduling. The task interface is + +```go +type task interface { + Type() ReqType + Ts() Timestamp + Execute() Status + WaitToFinish() Status + Notify() Status +} +``` + +A task example is as follows. In this example, we wrap a CreateCollectionRequest (a proto) as a createCollectionTask. The wrapper need to contain task interfaces. + +``` go +type createCollectionTask struct { + req *CreateCollectionRequest + cv int chan +} + +// Task interfaces +func (task *createCollectionTask) Type() ReqType +func (task *createCollectionTask) Ts() Timestamp +func (task *createCollectionTask) Execute() Status +func (task *createCollectionTask) Notify() Status +func (task *createCollectionTask) WaitToFinish() Status +``` + + + +###### 5.2.2 Scheduler + +```go +type ddRequestScheduler struct { + reqQueue *task chan +} + +func (rs *ddRequestScheduler) Enqueue(task *task) Status +func (rs *ddRequestScheduler) schedule() *task // implement scheduling policy +``` + + + +#### 5.4 Meta Table + +```go +type metaTable struct { + client *etcd.Client // client of a reliable kv service, i.e. etcd client + rootPath string // this metaTable's working root path on the reliable kv service + tenantMeta map[int64]TenantMeta // tenant id to tenant meta + proxyMeta map[int64]ProxyMeta // proxy id to proxy meta + collMeta map[int64]CollectionMeta // collection id to collection meta + segMeta map[int64]SegmentMeta // segment id to segment meta +} +``` + diff --git a/internal/proto/etcd_meta.proto b/internal/proto/etcd_meta.proto index 24f2a25045..d40fe628fb 100644 --- a/internal/proto/etcd_meta.proto +++ b/internal/proto/etcd_meta.proto @@ -12,6 +12,7 @@ message TenantMeta { string query_channel_id = 4; } + message ProxyMeta { uint64 id = 1; common.Address address = 2; @@ -37,4 +38,4 @@ message SegmentMeta { uint64 open_time=6; uint64 close_time=7; int64 num_rows=8; -} \ No newline at end of file +} diff --git a/internal/proto/schema.proto b/internal/proto/schema.proto index 7b04177b2e..327fafc73d 100644 --- a/internal/proto/schema.proto +++ b/internal/proto/schema.proto @@ -44,4 +44,4 @@ message CollectionSchema { string description = 2; bool auto_id = 3; repeated FieldSchema fields = 4; -} \ No newline at end of file +} diff --git a/internal/proxy/manipulation_req.go b/internal/proxy/manipulation_req.go index 8b4d75b69c..b2f9c84e26 100644 --- a/internal/proxy/manipulation_req.go +++ b/internal/proxy/manipulation_req.go @@ -12,7 +12,7 @@ import ( ) type manipulationReq struct { - commonpb.Status + stats []commonpb.Status msgs []*pb.ManipulationReqMsg wg sync.WaitGroup proxy *proxyServer @@ -26,14 +26,13 @@ func (req *manipulationReq) Ts() (Timestamp, error) { return Timestamp(req.msgs[0].Timestamp), nil } func (req *manipulationReq) SetTs(ts Timestamp) { - for _, mreq := range req.msgs { - mreq.Timestamp = uint64(ts) + for _, msg := range req.msgs { + msg.Timestamp = uint64(ts) } } // BaseRequest interfaces func (req *manipulationReq) Type() pb.ReqType { - // TODO: return a invalid ReqType? if req.msgs == nil { return 0 } @@ -59,11 +58,17 @@ func (req *manipulationReq) Execute() commonpb.Status { func (req *manipulationReq) PostExecute() commonpb.Status { // send into pulsar req.wg.Add(1) - return req.Status + return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} } func (req *manipulationReq) WaitToFinish() commonpb.Status { // wait until send into pulsar req.wg.Wait() + + for _, stat := range req.stats{ + if stat.ErrorCode != commonpb.ErrorCode_SUCCESS{ + return stat + } + } // update timestamp if necessary ts, _ := req.Ts() req.proxy.reqSch.mTimestampMux.Lock() @@ -73,7 +78,7 @@ func (req *manipulationReq) WaitToFinish() commonpb.Status { // wait until send } else { log.Printf("there is some wrong with m_timestamp, it goes back, current = %d, previous = %d", ts, req.proxy.reqSch.mTimestamp) } - return req.Status + return req.stats[0] } func (s *proxyServer) restartManipulationRoutine(bufSize int) error { @@ -109,22 +114,23 @@ func (s *proxyServer) restartManipulationRoutine(bufSize int) error { ts, st := s.getTimestamp(1) if st.ErrorCode != commonpb.ErrorCode_SUCCESS { log.Printf("get time stamp failed, error code = %d, msg = %s", st.ErrorCode, st.Reason) - ip.Status = st + ip.stats[0] = st ip.wg.Done() break } ip.SetTs(ts[0]) wg := sync.WaitGroup{} - for _, mq := range ip.msgs { + for i, mq := range ip.msgs { mq := mq + i := i + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() mb, err := proto.Marshal(mq) if err != nil { log.Printf("Marshal ManipulationReqMsg failed, error = %v", err) - ip.Status = commonpb.Status{ + ip.stats[i] = commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: fmt.Sprintf("Marshal ManipulationReqMsg failed, error=%v", err), } @@ -135,7 +141,7 @@ func (s *proxyServer) restartManipulationRoutine(bufSize int) error { case pb.ReqType_kInsert: if _, err := readers[mq.ChannelId].Send(s.ctx, &pulsar.ProducerMessage{Payload: mb}); err != nil { log.Printf("post into puslar failed, error = %v", err) - ip.Status = commonpb.Status{ + ip.stats[i] = commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: fmt.Sprintf("Post into puslar failed, error=%v", err.Error()), } @@ -144,6 +150,10 @@ func (s *proxyServer) restartManipulationRoutine(bufSize int) error { case pb.ReqType_kDeleteEntityByID: if _, err = deleter.Send(s.ctx, &pulsar.ProducerMessage{Payload: mb}); err != nil { log.Printf("post into pulsar filed, error = %v", err) + ip.stats[i] = commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("Post into puslar failed, error=%v", err.Error()), + } return } default: diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 3a2faf34c0..38016452c4 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -156,6 +156,7 @@ func (s *proxyServer) DeleteByID(ctx context.Context, req *pb.DeleteByIDParam) ( } if len(mReqMsg.PrimaryKeys) > 1 { mReq := &manipulationReq{ + stats: make([]commonpb.Status, 1), msgs: append([]*pb.ManipulationReqMsg{}, &mReqMsg), proxy: s, } @@ -222,10 +223,10 @@ func (s *proxyServer) Insert(ctx context.Context, req *servicepb.RowBatch) (*ser // TODO: alloc manipulation request id mReq := manipulationReq{ - Status: commonpb.Status{}, - msgs: make([]*pb.ManipulationReqMsg, len(msgMap)), - wg: sync.WaitGroup{}, - proxy: s, + stats: make([]commonpb.Status, len(msgMap)), + msgs: make([]*pb.ManipulationReqMsg, len(msgMap)), + wg: sync.WaitGroup{}, + proxy: s, } for _, v := range msgMap { mReq.msgs = append(mReq.msgs, v)