diff --git a/docs/developer_guides/developer_guides.md b/docs/developer_guides/developer_guides.md index 7f76b28eb2..c99995e5e5 100644 --- a/docs/developer_guides/developer_guides.md +++ b/docs/developer_guides/developer_guides.md @@ -204,7 +204,7 @@ type Timestamp uint64 #### 4.2 Timestamp Oracle -``` +```go type timestampOracle struct { client *etcd.Client // client of a reliable meta service, i.e. etcd client rootPath string // this timestampOracle's working root path on the reliable kv service @@ -213,16 +213,29 @@ type timestampOracle struct { tso Timestamp // monotonically increasing timestamp } -func (tso *timestampOracle) GetTimestamp(count uint32) ([]Timestamp, Status) +func (tso *timestampOracle) GetTimestamp(count uint32) ([]Timestamp, error) -func (tso *timestampOracle) saveTimestamp() Status -func (tso *timestampOracle) loadTimestamp() Status +func (tso *timestampOracle) saveTimestamp() error +func (tso *timestampOracle) loadTimestamp() error ``` #### 4.2 Timestamp Allocator +```go +type TimestampAllocator struct { + Alloc(count uint32) ([]Timestamp, error) +} + +func (allocator *TimestampAllocator) Start() error +func (allocator *TimestampAllocator) Close() error + +func NewTimestampAllocator() *TimestampAllocator +``` + + + ###### 4.2.1 Batch Allocation of Timestamps ###### 4.2.2 Expiration of Timestamps @@ -277,10 +290,10 @@ type GlobalParamsTable struct { params memoryKV } -func (gparams *GlobalParamsTable) Save(key, value string) Status -func (gparams *GlobalParamsTable) Load(key string) (string, Status) -func (gparams *GlobalParamsTable) LoadRange(key, endKey string, limit int) ([]string, []string, Status) -func (gparams *GlobalParamsTable) Remove(key string) Status +func (gparams *GlobalParamsTable) Save(key, value string) error +func (gparams *GlobalParamsTable) Load(key string) (string, error) +func (gparams *GlobalParamsTable) LoadRange(key, endKey string, limit int) ([]string, []string, error) +func (gparams *GlobalParamsTable) Remove(key string) error ``` @@ -300,41 +313,46 @@ type TsMsg interface { Type() MsgType } -type TsMsgMarshaler interface { - Marshal(input *TsMsg) ([]byte, Status) - Unmarshal(input []byte) (*TsMsg, Status) -} - type MsgPack struct { BeginTs Timestamp EndTs Timestamp Msgs []*TsMsg } +type TsMsgMarshaler interface { + Marshal(input *TsMsg) ([]byte, error) + Unmarshal(input []byte) (*TsMsg, error) +} + type MsgStream interface { SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) - Produce(*MsgPack) Status + Produce(*MsgPack) error Consume() *MsgPack // message can be consumed exactly once } +type HashFunc func(*MsgPack) map[int32]*MsgPack + type PulsarMsgStream struct { client *pulsar.Client - produceChannels []string - consumeChannels []string - + msgHashFunc HashFunc // return a map from produceChannel idx to *MsgPack + producers []*pulsar.Producer + consumers []*pulsar.Consumer msgMarshaler *TsMsgMarshaler msgUnmarshaler *TsMsgMarshaler } +func (ms *PulsarMsgStream) SetProducerChannels(channels []string) +func (ms *PulsarMsgStream) SetConsumerChannels(channels []string) func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) -func (ms *PulsarMsgStream) Produce(*MsgPack) Status -func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick +func (ms *PulsarMsgStream) SetMsgHashFunc(hashFunc *HashFunc) +func (ms *PulsarMsgStream) Produce(msgs *MsgPack) error +func (ms *PulsarMsgStream) Consume() (*MsgPack, error) //return messages in one time tick type PulsarTtMsgStream struct { client *pulsar.Client - produceChannels []string - consumeChannels []string - + msgHashFunc (*MsgPack) map[int32]*MsgPack // return a map from produceChannel idx to *MsgPack + producers []*pulsar.Producer + consumers []*pulsar.Consumer msgMarshaler *TsMsgMarshaler msgUnmarshaler *TsMsgMarshaler inputBuf []*TsMsg @@ -342,9 +360,27 @@ type PulsarTtMsgStream struct { msgPacks []*MsgPack } -func (ms *PulsarTtMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) -func (ms *PulsarTtMsgStream) Produce(*MsgPack) Status -func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time tick +func (ms *PulsarMsgStream) SetProducerChannels(channels []string) +func (ms *PulsarMsgStream) SetConsumerChannels(channels []string) +func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) +func (ms *PulsarMsgStream) SetMsgHashFunc(hashFunc *HashFunc) +func (ms *PulsarMsgStream) Produce(msgs *MsgPack) error +func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick +``` + + + +#### 5.4 ID Allocator + +```go +type IdAllocator struct { + Alloc(count uint32) ([]int64, error) +} + +func (allocator *IdAllocator) Start() error +func (allocator *IdAllocator) Close() error + +func NewIdAllocator() *IdAllocator ``` @@ -353,17 +389,60 @@ func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time ti ## 6. Proxy -#### 6.1 Overview +#### 6.1 Proxy Instance -#### 3.1 Task +```go +type Proxy struct { + servicepb.UnimplementedMilvusServiceServer + masterClient mpb.MasterClient + + timeTick *timeTick + ttStream *MessageStream + scheduler *taskScheduler + tsAllocator *TimestampAllocator + ReqIdAllocator *IdAllocator + RowIdAllocator *IdAllocator + SegIdAssigner *segIdAssigner +} + +func (proxy *Proxy) Start() error +func NewProxy(ctx context.Context) *Proxy +``` + + + +#### Global Parameter Table + +```go +type GlobalParamsTable struct { +} +func (*paramTable GlobalParamsTable) ProxyId() int64 +func (*paramTable GlobalParamsTable) ProxyAddress() string +func (*paramTable GlobalParamsTable) MasterAddress() string +func (*paramTable GlobalParamsTable) PulsarAddress() string +func (*paramTable GlobalParamsTable) TimeTickTopic() string +func (*paramTable GlobalParamsTable) InsertTopics() []string +func (*paramTable GlobalParamsTable) QueryTopic() string +func (*paramTable GlobalParamsTable) QueryResultTopics() []string +func (*paramTable GlobalParamsTable) Init() error + +var ProxyParamTable GlobalParamsTable +``` + + + + + +#### 6.2 Task ``` go type task interface { - PreExecute() Status - Execute() Status - PostExecute() Status - WaitToFinish() Status - Notify() Status + Id() int64 // return ReqId + PreExecute() error + Execute() error + PostExecute() error + WaitToFinish() error + Notify() error } ``` @@ -377,11 +456,11 @@ type baseTask struct { ProxyId int64 } -func (task *baseTask) PreExecute() Status -func (task *baseTask) Execute() Status -func (task *baseTask) PostExecute() Status -func (task *baseTask) WaitToFinish() Status -func (task *baseTask) Notify() Status +func (task *baseTask) PreExecute() error +func (task *baseTask) Execute() error +func (task *baseTask) PostExecute() error +func (task *baseTask) WaitToFinish() error +func (task *baseTask) Notify() error ``` * Insert Task @@ -396,34 +475,121 @@ type insertTask struct { rowBatch *RowBatch } -func (task *InsertTask) Execute() Status -func (task *InsertTask) WaitToFinish() Status -func (task *InsertTask) Notify() Status +func (task *InsertTask) Execute() error +func (task *InsertTask) WaitToFinish() error +func (task *InsertTask) Notify() error ``` #### 6.2 Task Scheduler +* Base Task Queue + +```go +type baseTaskQueue struct { + unissuedTasks *List + activeTasks map[int64]*task + utLock sync.Mutex // lock for UnissuedTasks + atLock sync.Mutex // lock for ActiveTasks +} +func (queue *baseTaskQueue) AddUnissuedTask(task *task) +func (queue *baseTaskQueue) FrontUnissuedTask() *task +func (queue *baseTaskQueue) PopUnissuedTask(id int64) *task +func (queue *baseTaskQueue) AddActiveTask(task *task) +func (queue *baseTaskQueue) PopActiveTask(id int64) *task +func (queue *baseTaskQueue) TaskDoneTest(ts Timestamp) bool +``` + +*AddUnissuedTask(task \*task)* will put a new task into *unissuedTasks*, while maintaining the list by timestamp order. + +*TaskDoneTest(ts Timestamp)* will check both *unissuedTasks* and *unissuedTasks*. If no task found before *ts*, then the function returns *true*, indicates that all the tasks before *ts* are completed. + + + +* Data Definition Task Queue + +```go +type ddTaskQueue struct { + baseTaskQueue + lock sync.Mutex +} +func (queue *ddTaskQueue) Enqueue(task *task) error + +func newDdTaskQueue() *ddTaskQueue +``` + +Data definition tasks (i.e. *CreateCollectionTask*) will be put into *DdTaskQueue*. If a task is enqueued, *Enqueue(task \*task)* will set *Ts*, *ReqId*, *ProxyId*, then push it into *queue*. The timestamps of the enqueued tasks should be strictly monotonically increasing. As *Enqueue(task \*task)* will be called in parallel, setting timestamp and queue insertion need to be done atomically. + + + +* Data Manipulation Task Queue + +```go +type dmTaskQueue struct { + baseTaskQueue +} +func (queue *dmTaskQueue) Enqueue(task *task) error + +func newDmTaskQueue() *dmTaskQueue +``` + +Insert tasks and delete tasks will be put into *DmTaskQueue*. + +If a *insertTask* is enqueued, *Enqueue(task \*task)* will set *Ts*, *ReqId*, *ProxyId*, *SegIdAssigner*, *RowIdAllocator*, then push it into *queue*. The *SegIdAssigner* and *RowIdAllocator* will later be used in the task's execution phase. + + + +* Data Query Task Queue + +```go +type dqTaskQueue struct { + baseTaskQueue +} +func (queue *dqTaskQueue) Enqueue(task *task) error + +func newDqTaskQueue() *dqTaskQueue +``` + +Queries will be put into *DqTaskQueue*. + + + +* Task Scheduler + ``` go type taskScheduler struct { - // definition tasks - ddTasks *task chan - // manipulation tasks - dmTasks *task chan - // query tasks - dqTasks *task chan + DdQueue *ddTaskQueue + DmQueue *dmTaskQueue + DqQueue *dqTaskQueue tsAllocator *TimestampAllocator ReqIdAllocator *IdAllocator } -func (sched *taskScheduler) EnqueueDDTask(task *task) Status -func (sched *taskScheduler) EnqueueDMTask(task *task) Status -func (sched *taskScheduler) EnqueueDQTask(task *task) Status +func (sched *taskScheduler) scheduleDdTask() *task +func (sched *taskScheduler) scheduleDmTask() *task +func (sched *taskScheduler) scheduleDqTask() *task +func (sched *taskScheduler) Start() error func (sched *taskScheduler) TaskDoneTest(ts Timestamp) bool +func newTaskScheduler(ctx context.Context, tsAllocator *TimestampAllocator, ReqIdAllocator *IdAllocator) *taskScheduler +``` + +*scheduleDdTask()* selects tasks in a FIFO manner, thus time order is garanteed. + +The policy of *scheduleDmTask()* should target on throughput, not tasks' time order. Note that the time order of the tasks' execution will later be garanteed by the timestamp & time tick mechanism. + +The policy of *scheduleDqTask()* should target on throughput. It should also take visibility into consideration. For example, if an insert task and a query arrive in a same time tick and the query comes after insert, the query should be scheduled in the next tick thus the query can see the insert. + +*TaskDoneTest(ts Timestamp)* will check all the three task queues. If no task found before *ts*, then the function returns *true*, indicates that all the tasks before *ts* are completed. + + + +* Statistics + +```go // ActiveComponent interfaces func (sched *taskScheduler) Id() String func (sched *taskScheduler) Status() Status @@ -443,20 +609,40 @@ message taskSchedulerHeartbeat { } ``` -* EnqueueDMTask - If a insertTask is enqueued, *EnqueueDDTask(task \*task)* will set *Ts*, *ReqId*, *ProxyId*, *SegIdAssigner*, *RowIdAllocator*, then push it into queue *dmTasks*. The *SegIdAssigner* and *RowIdAllocator* will later be used in the task's execution phase. #### 6.3 Time Tick +* Time Tick + ``` go type timeTick struct { lastTick Timestamp currentTick Timestamp + wallTick Timestamp + tickStep Timestamp + syncInterval Timestamp + + tsAllocator *TimestampAllocator + scheduler *taskScheduler + ttStream *MessageStream + + ctx context.Context } -func (tt *timeTick) tick() Status +func (tt *timeTick) Start() error +func (tt *timeTick) synchronize() error +func newTimeTick(ctx context.Context, tickStep Timestamp, syncInterval Timestamp, tsAllocator *TimestampAllocator, scheduler *taskScheduler, ttStream *MessageStream) *timeTick +``` + +*Start()* will enter a loop. On each *tickStep*, it tries to send a *TIME_TICK* typed *TsMsg* into *ttStream*. After each *syncInterval*, it sychronizes its *wallTick* with *tsAllocator* by calling *synchronize()*. When *currentTick + tickStep < wallTick* holds, it will update *currentTick* with *wallTick* on next tick. Otherwise, it will update *currentTick* with *currentTick + tickStep*. + + + +* Statistics + +```go // ActiveComponent interfaces func (tt *timeTick) ID() String func (tt *timeTick) Status() Status @@ -473,74 +659,7 @@ message TimeTickHeartbeat { -## 7. Message Stream -#### 7.1 Overview - - - -#### 7.2 Message Stream - -``` go -type TsMsg interface { - SetTs(ts Timestamp) - Ts() Timestamp -} - -type MsgPack struct { - BeginTs Timestamp - EndTs Timestamp - Msgs []*TsMsg -} - -type TsMsgMarshaler interface { - Marshal(input *TsMsg) ([]byte, Status) - Unmarshal(input []byte) (*TsMsg, Status) -} - -type MsgStream interface { - SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) - Produce(*MsgPack) Status - Consume() *MsgPack // message can be consumed exactly once -} - -type HashFunc func(*MsgPack) map[int32]*MsgPack - -type PulsarMsgStream struct { - client *pulsar.Client - msgHashFunc HashFunc // return a map from produceChannel idx to *MsgPack - producers []*pulsar.Producer - consumers []*pulsar.Consumer - msgMarshaler *TsMsgMarshaler - msgUnmarshaler *TsMsgMarshaler -} - -func (ms *PulsarMsgStream) SetProducerChannels(channels []string) -func (ms *PulsarMsgStream) SetConsumerChannels(channels []string) -func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) -func (ms *PulsarMsgStream) SetMsgHashFunc(hashFunc *HashFunc) -func (ms *PulsarMsgStream) Produce(msgs *MsgPack) Status -func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick - -type PulsarTtMsgStream struct { - client *pulsar.Client - msgHashFunc (*MsgPack) map[int32]*MsgPack // return a map from produceChannel idx to *MsgPack - producers []*pulsar.Producer - consumers []*pulsar.Consumer - msgMarshaler *TsMsgMarshaler - msgUnmarshaler *TsMsgMarshaler - inputBuf []*TsMsg - unsolvedBuf []*TsMsg - msgPacks []*MsgPack -} - -func (ms *PulsarMsgStream) SetProducerChannels(channels []string) -func (ms *PulsarMsgStream) SetConsumerChannels(channels []string) -func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) -func (ms *PulsarMsgStream) SetMsgHashFunc(hashFunc *HashFunc) -func (ms *PulsarMsgStream) Produce(msgs *MsgPack) Status -func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick -``` @@ -602,13 +721,13 @@ type Segment struct { type ManipulationReqUnmarshaler struct {} // implementations of MsgUnmarshaler interfaces -func (unmarshaler *InsertMsgUnmarshaler) Unmarshal(input *pulsar.Message) (*TsMsg, Status) +func (unmarshaler *InsertMsgUnmarshaler) Unmarshal(input *pulsar.Message) (*TsMsg, error) type QueryReqUnmarshaler struct {} // implementations of MsgUnmarshaler interfaces -func (unmarshaler *QueryReqUnmarshaler) Unmarshal(input *pulsar.Message) (*TsMsg, Status) +func (unmarshaler *QueryReqUnmarshaler) Unmarshal(input *pulsar.Message) (*TsMsg, error) ``` @@ -695,9 +814,9 @@ Master receives data definition requests via grpc. Each request (described by a type task interface { Type() ReqType Ts() Timestamp - Execute() Status - WaitToFinish() Status - Notify() Status + Execute() error + WaitToFinish() error + Notify() error } ``` @@ -712,9 +831,9 @@ type createCollectionTask struct { // Task interfaces func (task *createCollectionTask) Type() ReqType func (task *createCollectionTask) Ts() Timestamp -func (task *createCollectionTask) Execute() Status -func (task *createCollectionTask) Notify() Status -func (task *createCollectionTask) WaitToFinish() Status +func (task *createCollectionTask) Execute() error +func (task *createCollectionTask) Notify() error +func (task *createCollectionTask) WaitToFinish() error ``` @@ -726,7 +845,7 @@ type ddRequestScheduler struct { reqQueue *task chan } -func (rs *ddRequestScheduler) Enqueue(task *task) Status +func (rs *ddRequestScheduler) Enqueue(task *task) error func (rs *ddRequestScheduler) schedule() *task // implement scheduling policy ``` diff --git a/internal/master/kv/etcd_kv.go b/internal/kv/etcd_kv.go similarity index 86% rename from internal/master/kv/etcd_kv.go rename to internal/kv/etcd_kv.go index 67148f4571..baa0b275fc 100644 --- a/internal/master/kv/etcd_kv.go +++ b/internal/kv/etcd_kv.go @@ -5,9 +5,9 @@ import ( "path" "time" + "github.com/pingcap/log" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/util/etcdutil" - "github.com/pingcap/log" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -21,24 +21,24 @@ var ( errTxnFailed = errors.New("failed to commit transaction") ) -type EtcdKVBase struct { +type EtcdKV struct { client *clientv3.Client rootPath string } -// NewEtcdKVBase creates a new etcd kv. -func NewEtcdKVBase(client *clientv3.Client, rootPath string) *EtcdKVBase { - return &EtcdKVBase{ +// NewEtcdKV creates a new etcd kv. +func NewEtcdKV(client *clientv3.Client, rootPath string) *EtcdKV { + return &EtcdKV{ client: client, rootPath: rootPath, } } -func (kv *EtcdKVBase) Close() { +func (kv *EtcdKV) Close() { kv.client.Close() } -func (kv *EtcdKVBase) LoadWithPrefix(key string) ([]string, []string) { +func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string) { key = path.Join(kv.rootPath, key) println("in loadWithPrefix,", key) resp, err := etcdutil.EtcdKVGet(kv.client, key, clientv3.WithPrefix()) @@ -61,7 +61,7 @@ func (kv *EtcdKVBase) LoadWithPrefix(key string) ([]string, []string) { return keys, values } -func (kv *EtcdKVBase) Load(key string) (string, error) { +func (kv *EtcdKV) Load(key string) (string, error) { key = path.Join(kv.rootPath, key) resp, err := etcdutil.EtcdKVGet(kv.client, key) @@ -76,7 +76,7 @@ func (kv *EtcdKVBase) Load(key string) (string, error) { return string(resp.Kvs[0].Value), nil } -func (kv *EtcdKVBase) Save(key, value string) error { +func (kv *EtcdKV) Save(key, value string) error { key = path.Join(kv.rootPath, key) txn := NewSlowLogTxn(kv.client) @@ -91,7 +91,7 @@ func (kv *EtcdKVBase) Save(key, value string) error { return nil } -func (kv *EtcdKVBase) Remove(key string) error { +func (kv *EtcdKV) Remove(key string) error { key = path.Join(kv.rootPath, key) txn := NewSlowLogTxn(kv.client) @@ -106,13 +106,13 @@ func (kv *EtcdKVBase) Remove(key string) error { return nil } -func (kv *EtcdKVBase) Watch(key string) clientv3.WatchChan { +func (kv *EtcdKV) Watch(key string) clientv3.WatchChan { key = path.Join(kv.rootPath, key) rch := kv.client.Watch(context.Background(), key) return rch } -func (kv *EtcdKVBase) WatchWithPrefix(key string) clientv3.WatchChan { +func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan { key = path.Join(kv.rootPath, key) rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix()) return rch diff --git a/internal/master/kv/kv.go b/internal/kv/kv.go similarity index 100% rename from internal/master/kv/kv.go rename to internal/kv/kv.go diff --git a/internal/master/controller/collection.go b/internal/master/controller/collection.go index 15c0504694..63f38c5df9 100644 --- a/internal/master/controller/collection.go +++ b/internal/master/controller/collection.go @@ -9,7 +9,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/master/collection" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/master/id" - "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/master/segment" ) diff --git a/internal/master/controller/segment.go b/internal/master/controller/segment.go index 65fcab0fdd..e3f2655c96 100644 --- a/internal/master/controller/segment.go +++ b/internal/master/controller/segment.go @@ -10,7 +10,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/master/collection" "github.com/zilliztech/milvus-distributed/internal/master/id" //"github.com/zilliztech/milvus-distributed/internal/master/informer" - "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/master/segment" ) diff --git a/internal/master/controller/segment_test.go b/internal/master/controller/segment_test.go index 7c9fd1ed19..7c87cb5b28 100644 --- a/internal/master/controller/segment_test.go +++ b/internal/master/controller/segment_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/conf" - "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "go.etcd.io/etcd/clientv3" ) @@ -19,7 +19,7 @@ func newKvBase() kv.Base { Endpoints: []string{etcdAddr}, DialTimeout: 5 * time.Second, }) - kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) + kvbase := kv.NewEtcdKV(cli, conf.Config.Etcd.Rootpath) return kvbase } diff --git a/internal/master/master.go b/internal/master/master.go index b09c2e9b59..c70f844be4 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -18,7 +18,7 @@ import ( "fmt" "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/master/informer" - "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/master/controller" @@ -80,7 +80,7 @@ func newKvBase() kv.Base { DialTimeout: 5 * time.Second, }) // defer cli.Close() - kvBase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) + kvBase := kv.NewEtcdKV(cli, conf.Config.Etcd.Rootpath) return kvBase } diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 41cb4faefb..2d065b5d98 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -2,7 +2,7 @@ package master import ( "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" ) diff --git a/internal/master/task.go b/internal/master/task.go index 825d07122b..c0e54bb1d7 100644 --- a/internal/master/task.go +++ b/internal/master/task.go @@ -2,7 +2,7 @@ package master import ( "context" - "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" ) diff --git a/internal/reader/meta.go b/internal/reader/meta.go index c0d8c00892..931079db8e 100644 --- a/internal/reader/meta.go +++ b/internal/reader/meta.go @@ -12,7 +12,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/master/collection" - "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/master/segment" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" @@ -290,7 +290,7 @@ func (node *QueryNode) InitFromMeta() error { DialTimeout: 5 * time.Second, }) //defer cli.Close() - node.kvBase = kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) + node.kvBase = kv.NewEtcdKV(cli, conf.Config.Etcd.Rootpath) node.loadCollections() node.loadSegments() return nil diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index f2f741ba98..65b1510473 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -15,10 +15,11 @@ import "C" import ( "context" - "github.com/zilliztech/milvus-distributed/internal/master/kv" + "time" + + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/msgclient" msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message" - "time" ) type InsertData struct { @@ -90,7 +91,7 @@ type QueryNode struct { deletePreprocessData DeletePreprocessData deleteData DeleteData insertData InsertData - kvBase *kv.EtcdKVBase + kvBase *kv.EtcdKV msgCounter *MsgCounter InsertLogs []InsertLog }