diff --git a/Makefile b/Makefile index 1f2d9ae7ce..c2081907a1 100644 --- a/Makefile +++ b/Makefile @@ -69,6 +69,8 @@ build-go: @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxy $(PWD)/cmd/proxy/proxy.go 1>/dev/null @echo "Building query node ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null + @echo "Building indexbuilder ..." + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/indexbuilder $(PWD)/cmd/indexbuilder/indexbuilder.go 1>/dev/null build-cpp: @(env bash $(PWD)/scripts/core_build.sh -f "$(CUSTOM_THIRDPARTY_PATH)") diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 41aae39fa5..6e2cd2bcdb 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -50,4 +50,4 @@ queryNode: indexBuilder: address: localhost - port: 310310 + port: 31000 diff --git a/internal/indexbuilder/client/client.go b/internal/indexbuilder/client/client.go index fb3e020198..fc50b2260e 100644 --- a/internal/indexbuilder/client/client.go +++ b/internal/indexbuilder/client/client.go @@ -8,7 +8,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -18,20 +17,26 @@ type Client struct { client indexbuilderpb.IndexBuildServiceClient } -type IndexStatus int32 - type IndexDescription struct { ID UniqueID - Status IndexStatus + Status indexbuilderpb.IndexStatus EnqueueTime time.Time ScheduleTime time.Time BuildCompleteTime time.Time } -func NewBuildIndexClient(conn *grpc.ClientConn) *Client { +func NewBuildIndexClient(ctx context.Context, address string) (*Client, error) { + conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + return nil, err + } return &Client{ client: indexbuilderpb.NewIndexBuildServiceClient(conn), - } + }, nil +} + +func parseTS(t int64) time.Time { + return time.Unix(0, t) } func (c *Client) BuildIndexWithoutID(columnDataPaths []string, typeParams map[string]string, indexParams map[string]string) (UniqueID, error) { @@ -66,36 +71,24 @@ func (c *Client) BuildIndexWithoutID(columnDataPaths []string, typeParams map[st return indexID, err } -func (c *Client) DescribeIndex(indexID UniqueID) (IndexDescription, error) { +func (c *Client) DescribeIndex(indexID UniqueID) (*IndexDescription, error) { ctx := context.TODO() request := &indexbuilderpb.DescribleIndexRequest{ IndexID: indexID, } response, err := c.client.DescribeIndex(ctx, request) if err != nil { - return IndexDescription{}, err + return &IndexDescription{}, err } - enqueueTime, _ := tsoutil.ParseTS(response.EnqueTime) - scheduleTime, _ := tsoutil.ParseTS(response.ScheduleTime) - buildCompleteTime, _ := tsoutil.ParseTS(response.BuildCompleteTime) indexDescription := IndexDescription{ ID: indexID, - Status: IndexStatus(response.IndexStatus), - EnqueueTime: enqueueTime, - ScheduleTime: scheduleTime, - BuildCompleteTime: buildCompleteTime, + Status: response.IndexStatus, + EnqueueTime: parseTS(response.EnqueTime), + ScheduleTime: parseTS(response.ScheduleTime), + BuildCompleteTime: parseTS(response.BuildCompleteTime), } - - //indexDescription := IndexDescription{ - // ID: indexID, - // Status: IndexStatus(response.IndexStatus), - // EnqueueTime: time.Unix(0, response.EnqueTime), - // ScheduleTime: time.Unix(-, response.ScheduleTime), - // BuildCompleteTime: time.Unix(0, response.BuildCompleteTime), - //} - - return indexDescription, nil + return &indexDescription, nil } func (c *Client) GetIndexFilePaths(indexID UniqueID) ([]string, error) { diff --git a/internal/indexbuilder/client/client_test.go b/internal/indexbuilder/client/client_test.go deleted file mode 100644 index fae0d7321f..0000000000 --- a/internal/indexbuilder/client/client_test.go +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package indexbuilderclient diff --git a/internal/indexbuilder/grpc_service.go b/internal/indexbuilder/grpc_service.go index a16934239a..2f8f10de5a 100644 --- a/internal/indexbuilder/grpc_service.go +++ b/internal/indexbuilder/grpc_service.go @@ -2,8 +2,10 @@ package indexbuilder import ( "context" + "errors" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" ) @@ -12,13 +14,68 @@ const ( ) func (b *Builder) BuildIndex(ctx context.Context, request *indexbuilderpb.BuildIndexRequest) (*indexbuilderpb.BuildIndexResponse, error) { - panic("implement me") + t := NewIndexAddTask() + t.req = request + t.idAllocator = b.idAllocator + t.buildQueue = b.sched.IndexBuildQueue + t.table = b.metaTable + var cancel func() + t.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval) + defer cancel() + + fn := func() error { + select { + case <-ctx.Done(): + return errors.New("insert timeout") + default: + return b.sched.IndexAddQueue.Enqueue(t) + } + } + ret := &indexbuilderpb.BuildIndexResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + } + + err := fn() + if err != nil { + ret.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR + ret.Status.Reason = err.Error() + return ret, nil + } + + err = t.WaitToFinish() + if err != nil { + ret.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR + ret.Status.Reason = err.Error() + return ret, nil + } + ret.IndexID = t.indexID + return ret, nil } func (b *Builder) DescribeIndex(ctx context.Context, request *indexbuilderpb.DescribleIndexRequest) (*indexbuilderpb.DescribleIndexResponse, error) { - panic("implement me") + indexID := request.IndexID + ret, err := b.metaTable.GetIndexDescription(indexID) + ret.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} + ret.IndexID = indexID + if err != nil { + ret.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR + ret.Status.Reason = err.Error() + } + return ret, nil } func (b *Builder) GetIndexFilePaths(ctx context.Context, request *indexbuilderpb.GetIndexFilePathsRequest) (*indexbuilderpb.GetIndexFilePathsResponse, error) { - panic("implement me") + ret := &indexbuilderpb.GetIndexFilePathsResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, + IndexID: request.IndexID, + } + filePaths, err := b.metaTable.GetIndexFilePaths(request.IndexID) + if err != nil { + ret.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR + ret.Status.Reason = err.Error() + } + ret.IndexFilePaths = filePaths + return ret, nil } diff --git a/internal/indexbuilder/indexbuilder.go b/internal/indexbuilder/indexbuilder.go index 1743766e1f..4fef888728 100644 --- a/internal/indexbuilder/indexbuilder.go +++ b/internal/indexbuilder/indexbuilder.go @@ -2,8 +2,6 @@ package indexbuilder import ( "context" - etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" - "go.etcd.io/etcd/clientv3" "log" "math/rand" "net" @@ -11,13 +9,17 @@ import ( "sync" "time" + "go.etcd.io/etcd/clientv3" + "github.com/zilliztech/milvus-distributed/internal/allocator" + etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "google.golang.org/grpc" ) type UniqueID = typeutil.UniqueID +type Timestamp = typeutil.Timestamp type Builder struct { loopCtx context.Context @@ -66,7 +68,7 @@ func CreateBuilder(ctx context.Context) (*Builder, error) { } b.idAllocator = idAllocator - b.sched, err = NewTaskScheduler(b.loopCtx, b.idAllocator) + b.sched, err = NewTaskScheduler(b.loopCtx, b.idAllocator, b.metaTable) if err != nil { return nil, err } @@ -88,6 +90,8 @@ func (b *Builder) startBuilder() error { cb() } + b.idAllocator.Start() + b.loopWg.Add(1) go b.grpcLoop() @@ -121,12 +125,13 @@ func (b *Builder) Start() error { func (b *Builder) stopBuilderLoop() { b.loopCancel() + b.idAllocator.Close() + if b.grpcServer != nil { b.grpcServer.GracefulStop() } b.sched.Close() - b.loopWg.Wait() } diff --git a/internal/indexbuilder/indexbuilder_test.go b/internal/indexbuilder/indexbuilder_test.go index d97e61ac54..6e72275428 100644 --- a/internal/indexbuilder/indexbuilder_test.go +++ b/internal/indexbuilder/indexbuilder_test.go @@ -2,28 +2,75 @@ package indexbuilder import ( "context" - "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" + "fmt" "log" "os" + "strconv" "testing" + "time" + + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" - "google.golang.org/grpc" + + "github.com/stretchr/testify/assert" + indexbuilderclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client" + "github.com/zilliztech/milvus-distributed/internal/master" + "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" ) var ctx context.Context var cancel func() -var clientConn *grpc.ClientConn -var buildClient indexbuilderpb.IndexBuildServiceClient +var buildClient *indexbuilderclient.Client var builderServer *Builder -var testNum = 10 +var masterPort = 53101 +var masterServer *master.Master + +func makeMasterAddress(port int64) string { + masterAddr := "127.0.0.1:" + strconv.FormatInt(port, 10) + return masterAddr +} + +func refreshMasterAddress() { + masterAddr := makeMasterAddress(int64(masterPort)) + Params.MasterAddress = masterAddr + master.Params.Port = masterPort +} + +func startMaster(ctx context.Context) { + master.Init() + refreshMasterAddress() + etcdAddr := master.Params.EtcdAddress + metaRootPath := master.Params.MetaRootPath + + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + if err != nil { + panic(err) + } + _, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix()) + if err != nil { + panic(err) + } + + svr, err := master.CreateServer(ctx) + masterServer = svr + if err != nil { + log.Print("create server failed", zap.Error(err)) + } + if err := svr.Run(int64(master.Params.Port)); err != nil { + log.Fatal("run server failed", zap.Error(err)) + } + + fmt.Println("Waiting for server!", svr.IsServing()) + +} func startBuilder(ctx context.Context) { - - builderServer, err := CreateBuilder(ctx) + var err error + builderServer, err = CreateBuilder(ctx) if err != nil { log.Print("create builder failed", zap.Error(err)) } @@ -37,21 +84,21 @@ func startBuilder(ctx context.Context) { func setup() { Params.Init() ctx, cancel = context.WithCancel(context.Background()) - + startMaster(ctx) startBuilder(ctx) addr := Params.Address - conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock()) + var err error + buildClient, err = indexbuilderclient.NewBuildIndexClient(ctx, addr) if err != nil { - log.Fatalf("Connect to builder server failed, error= %v", err) + panic("Create buildClient Failed!") } - clientConn = conn - buildClient = indexbuilderpb.NewIndexBuildServiceClient(clientConn) } func shutdown() { cancel() builderServer.Close() + masterServer.Close() } func TestMain(m *testing.M) { @@ -60,3 +107,27 @@ func TestMain(m *testing.M) { shutdown() os.Exit(code) } + +func TestBuilder_GRPC(t *testing.T) { + typeParams := make(map[string]string) + typeParams["a"] = "1" + indexParams := make(map[string]string) + indexParams["b"] = "2" + columnDataPaths := []string{"dataA", "dataB"} + indexID, err := buildClient.BuildIndexWithoutID(columnDataPaths, typeParams, indexParams) + assert.Nil(t, err) + + select { + case <-time.After(time.Second * 3): + } + + description, err := buildClient.DescribeIndex(indexID) + assert.Nil(t, err) + assert.Equal(t, indexbuilderpb.IndexStatus_FINISHED, description.Status) + assert.Equal(t, indexID, description.ID) + + indexDataPaths, err := buildClient.GetIndexFilePaths(indexID) + assert.Nil(t, err) + assert.NotNil(t, indexDataPaths) + +} diff --git a/internal/indexbuilder/meta_table.go b/internal/indexbuilder/meta_table.go index d0738e9a44..38ed76125c 100644 --- a/internal/indexbuilder/meta_table.go +++ b/internal/indexbuilder/meta_table.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/errors" @@ -46,7 +47,6 @@ func (mt *metaTable) reloadFromKV() error { } mt.indexID2Meta[indexMeta.IndexID] = indexMeta } - return nil } @@ -59,29 +59,96 @@ func (mt *metaTable) saveIndexMeta(meta *pb.IndexMeta) error { return mt.client.Save("/indexes/"+strconv.FormatInt(meta.IndexID, 10), value) } -func (mt *metaTable) AddIndex(meta *pb.IndexMeta) error { +func (mt *metaTable) AddIndex(indexID UniqueID, req *pb.BuildIndexRequest) error { mt.lock.Lock() defer mt.lock.Unlock() - - return nil -} - -func (mt *metaTable) UpdateIndex(meta *pb.IndexMeta) error { - mt.lock.Lock() - defer mt.lock.Unlock() - - return nil -} - -func (mt *metaTable) GetIndexByID(indexID UniqueID) (*pb.IndexMeta, error) { - mt.lock.RLock() - defer mt.lock.RUnlock() - - sm, ok := mt.indexID2Meta[indexID] - if !ok { - return nil, errors.Errorf("can't find index id = %d", indexID) + _, ok := mt.indexID2Meta[indexID] + if ok { + return errors.Errorf("index already exists with ID = " + strconv.FormatInt(indexID, 10)) } - return &sm, nil + meta := &pb.IndexMeta{ + Status: pb.IndexStatus_UNISSUED, + IndexID: indexID, + Req: req, + } + mt.saveIndexMeta(meta) + return nil +} + +func (mt *metaTable) UpdateIndexStatus(indexID UniqueID, status pb.IndexStatus) error { + mt.lock.Lock() + defer mt.lock.Unlock() + meta, ok := mt.indexID2Meta[indexID] + if !ok { + return errors.Errorf("index not exists with ID = " + strconv.FormatInt(indexID, 10)) + } + meta.Status = status + mt.saveIndexMeta(&meta) + return nil +} + +func (mt *metaTable) UpdateIndexEnqueTime(indexID UniqueID, t time.Time) error { + mt.lock.Lock() + defer mt.lock.Unlock() + meta, ok := mt.indexID2Meta[indexID] + if !ok { + return errors.Errorf("index not exists with ID = " + strconv.FormatInt(indexID, 10)) + } + meta.EnqueTime = t.UnixNano() + mt.saveIndexMeta(&meta) + return nil +} + +func (mt *metaTable) UpdateIndexScheduleTime(indexID UniqueID, t time.Time) error { + mt.lock.Lock() + defer mt.lock.Unlock() + meta, ok := mt.indexID2Meta[indexID] + if !ok { + return errors.Errorf("index not exists with ID = " + strconv.FormatInt(indexID, 10)) + } + meta.ScheduleTime = t.UnixNano() + mt.saveIndexMeta(&meta) + return nil +} + +func (mt *metaTable) CompleteIndex(indexID UniqueID, dataPaths []string) error { + mt.lock.Lock() + defer mt.lock.Unlock() + meta, ok := mt.indexID2Meta[indexID] + if !ok { + return errors.Errorf("index not exists with ID = " + strconv.FormatInt(indexID, 10)) + } + meta.Status = pb.IndexStatus_FINISHED + meta.IndexFilePaths = dataPaths + meta.BuildCompleteTime = time.Now().UnixNano() + mt.saveIndexMeta(&meta) + return nil +} + +func (mt *metaTable) GetIndexDescription(indexID UniqueID) (*pb.DescribleIndexResponse, error) { + mt.lock.Lock() + defer mt.lock.Unlock() + ret := &pb.DescribleIndexResponse{} + meta, ok := mt.indexID2Meta[indexID] + if !ok { + return ret, errors.Errorf("index not exists with ID = " + strconv.FormatInt(indexID, 10)) + } + ret.IndexStatus = meta.Status + ret.EnqueTime = meta.EnqueTime + ret.BuildCompleteTime = meta.BuildCompleteTime + ret.ScheduleTime = meta.ScheduleTime + return ret, nil +} + +func (mt *metaTable) GetIndexFilePaths(indexID UniqueID) ([]string, error) { + mt.lock.Lock() + defer mt.lock.Unlock() + + meta, ok := mt.indexID2Meta[indexID] + if !ok { + return nil, errors.Errorf("index not exists with ID = " + strconv.FormatInt(indexID, 10)) + } + return meta.IndexFilePaths, nil } func (mt *metaTable) DeleteIndex(indexID UniqueID) error { diff --git a/internal/indexbuilder/paramtable.go b/internal/indexbuilder/paramtable.go index d5e40a8e17..45ed54f818 100644 --- a/internal/indexbuilder/paramtable.go +++ b/internal/indexbuilder/paramtable.go @@ -25,6 +25,9 @@ func (pt *ParamTable) Init() { pt.BaseTable.Init() pt.initAddress() pt.initPort() + pt.initEtcdAddress() + pt.initMasterAddress() + pt.initMetaRootPath() } func (pt *ParamTable) initAddress() { diff --git a/internal/indexbuilder/paramtable_test.go b/internal/indexbuilder/paramtable_test.go index 1be77cf6f6..390a327d14 100644 --- a/internal/indexbuilder/paramtable_test.go +++ b/internal/indexbuilder/paramtable_test.go @@ -12,12 +12,12 @@ func TestParamTable_Init(t *testing.T) { func TestParamTable_Address(t *testing.T) { address := Params.Address - assert.Equal(t, address, "localhost") + assert.Equal(t, address, "localhost:31000") } func TestParamTable_Port(t *testing.T) { port := Params.Port - assert.Equal(t, port, 310310) + assert.Equal(t, port, 31000) } func TestParamTable_MetaRootPath(t *testing.T) { diff --git a/internal/indexbuilder/task.go b/internal/indexbuilder/task.go index 29c0814640..8509883a2d 100644 --- a/internal/indexbuilder/task.go +++ b/internal/indexbuilder/task.go @@ -2,8 +2,12 @@ package indexbuilder import ( "context" + "log" + "time" + "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" ) type task interface { @@ -14,12 +18,14 @@ type task interface { PostExecute() error WaitToFinish() error Notify(err error) + OnEnqueue() error } type BaseTask struct { - done chan error - ctx context.Context - id UniqueID + done chan error + ctx context.Context + id UniqueID + table *metaTable } func (bt *BaseTask) ID() UniqueID { @@ -31,13 +37,11 @@ func (bt *BaseTask) setID(id UniqueID) { } func (bt *BaseTask) WaitToFinish() error { - for { - select { - case <-bt.ctx.Done(): - return errors.New("timeout") - case err := <-bt.done: - return err - } + select { + case <-bt.ctx.Done(): + return errors.New("timeout") + case err := <-bt.done: + return err } } @@ -45,38 +49,104 @@ func (bt *BaseTask) Notify(err error) { bt.done <- err } -type IndexBuildTask struct { +type IndexAddTask struct { BaseTask - rowIDAllocator *allocator.IDAllocator + req *indexbuilderpb.BuildIndexRequest + indexID UniqueID + idAllocator *allocator.IDAllocator + buildQueue TaskQueue } -func (it *IndexBuildTask) PreExecute() error { +func (it *IndexAddTask) SetID(ID UniqueID) { + it.BaseTask.setID(ID) +} +func (it *IndexAddTask) OnEnqueue() error { + var err error + it.indexID, err = it.idAllocator.AllocOne() + if err != nil { + return err + } return nil } -func (it *IndexBuildTask) Execute() error { +func (it *IndexAddTask) PreExecute() error { + log.Println("pretend to check Index Req") + err := it.table.AddIndex(it.indexID, it.req) + if err != nil { + return err + } + return nil +} +func (it *IndexAddTask) Execute() error { + t := newIndexBuildTask() + t.table = it.table + t.indexID = it.indexID + var cancel func() + t.ctx, cancel = context.WithTimeout(it.ctx, reqTimeoutInterval) + defer cancel() + + fn := func() error { + select { + case <-t.ctx.Done(): + return errors.New("index add timeout") + default: + return it.buildQueue.Enqueue(t) + } + } + return fn() +} + +func (it *IndexAddTask) PostExecute() error { + return nil +} + +func NewIndexAddTask() *IndexAddTask { + return &IndexAddTask{ + BaseTask: BaseTask{ + done: make(chan error), + }, + } +} + +type IndexBuildTask struct { + BaseTask + indexID UniqueID + indexMeta *indexbuilderpb.IndexMeta +} + +func newIndexBuildTask() *IndexBuildTask { + return &IndexBuildTask{ + BaseTask: BaseTask{ + done: make(chan error, 1), // intend to do this + }, + } +} + +func (it *IndexBuildTask) SetID(ID UniqueID) { + it.BaseTask.setID(ID) +} + +func (it *IndexBuildTask) OnEnqueue() error { + return it.table.UpdateIndexEnqueTime(it.indexID, time.Now()) +} + +func (it *IndexBuildTask) PreExecute() error { + return it.table.UpdateIndexScheduleTime(it.indexID, time.Now()) +} + +func (it *IndexBuildTask) Execute() error { + err := it.table.UpdateIndexStatus(it.indexID, indexbuilderpb.IndexStatus_INPROGRESS) + if err != nil { + return err + } + time.Sleep(time.Second) + log.Println("Pretend to Execute for 1 second") return nil } func (it *IndexBuildTask) PostExecute() error { - return nil -} - -type DescribeIndexTask struct { - BaseTask - ctx context.Context -} - -func (dct *DescribeIndexTask) PreExecute() error { - return nil -} - -func (dct *DescribeIndexTask) Execute() error { - return nil -} - -func (dct *DescribeIndexTask) PostExecute() error { - return nil + dataPaths := []string{"file1", "file2"} + return it.table.CompleteIndex(it.indexID, dataPaths) } diff --git a/internal/indexbuilder/task_scheduler.go b/internal/indexbuilder/task_scheduler.go index 370ee5d7a8..7eaa3d0926 100644 --- a/internal/indexbuilder/task_scheduler.go +++ b/internal/indexbuilder/task_scheduler.go @@ -117,30 +117,30 @@ func (queue *BaseTaskQueue) Enqueue(t task) error { tID, _ := queue.sched.idAllocator.AllocOne() log.Printf("[Builder] allocate reqID: %v", tID) t.SetID(tID) + err := t.OnEnqueue() + if err != nil { + return err + } return queue.addUnissuedTask(t) } -type DdTaskQueue struct { +type IndexAddTaskQueue struct { BaseTaskQueue lock sync.Mutex } -type DescribleTaskQueue struct { - BaseTaskQueue -} - type IndexBuildTaskQueue struct { BaseTaskQueue } -func (queue *DdTaskQueue) Enqueue(t task) error { +func (queue *IndexAddTaskQueue) Enqueue(t task) error { queue.lock.Lock() defer queue.lock.Unlock() return queue.BaseTaskQueue.Enqueue(t) } -func NewDescribleTaskQueue(sched *TaskScheduler) *DescribleTaskQueue { - return &DescribleTaskQueue{ +func NewIndexAddTaskQueue(sched *TaskScheduler) *IndexAddTaskQueue { + return &IndexAddTaskQueue{ BaseTaskQueue: BaseTaskQueue{ unissuedTasks: list.New(), activeTasks: make(map[UniqueID]task), @@ -164,10 +164,11 @@ func NewIndexBuildTaskQueue(sched *TaskScheduler) *IndexBuildTaskQueue { } type TaskScheduler struct { - DescribeQueue TaskQueue + IndexAddQueue TaskQueue IndexBuildQueue TaskQueue idAllocator *allocator.IDAllocator + metaTable *metaTable wg sync.WaitGroup ctx context.Context @@ -175,21 +176,23 @@ type TaskScheduler struct { } func NewTaskScheduler(ctx context.Context, - idAllocator *allocator.IDAllocator) (*TaskScheduler, error) { + idAllocator *allocator.IDAllocator, + table *metaTable) (*TaskScheduler, error) { ctx1, cancel := context.WithCancel(ctx) s := &TaskScheduler{ idAllocator: idAllocator, + metaTable: table, ctx: ctx1, cancel: cancel, } - s.DescribeQueue = NewDescribleTaskQueue(s) + s.IndexAddQueue = NewIndexAddTaskQueue(s) s.IndexBuildQueue = NewIndexBuildTaskQueue(s) return s, nil } -func (sched *TaskScheduler) scheduleDescribleTask() task { - return sched.DescribeQueue.PopUnissuedTask() +func (sched *TaskScheduler) scheduleIndexAddTask() task { + return sched.IndexAddQueue.PopUnissuedTask() } func (sched *TaskScheduler) scheduleIndexBuildTask() task { @@ -225,7 +228,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { log.Printf("post execute task done ...") } -func (sched *TaskScheduler) indexBuildingLoop() { +func (sched *TaskScheduler) indexBuildLoop() { defer sched.wg.Done() for { select { @@ -240,28 +243,28 @@ func (sched *TaskScheduler) indexBuildingLoop() { } } -func (sched *TaskScheduler) describeLoop() { +func (sched *TaskScheduler) indexAddLoop() { defer sched.wg.Done() for { select { case <-sched.ctx.Done(): return - case <-sched.DescribeQueue.utChan(): - if !sched.DescribeQueue.utEmpty() { - t := sched.scheduleDescribleTask() - go sched.processTask(t, sched.DescribeQueue) + case <-sched.IndexAddQueue.utChan(): + if !sched.IndexAddQueue.utEmpty() { + t := sched.scheduleIndexAddTask() + go sched.processTask(t, sched.IndexAddQueue) } } } } func (sched *TaskScheduler) Start() error { - sched.wg.Add(1) - go sched.indexBuildingLoop() sched.wg.Add(1) - go sched.describeLoop() + go sched.indexAddLoop() + sched.wg.Add(1) + go sched.indexBuildLoop() return nil } diff --git a/internal/proto/index_builder.proto b/internal/proto/index_builder.proto index 6f711e4007..6e64a6bb26 100644 --- a/internal/proto/index_builder.proto +++ b/internal/proto/index_builder.proto @@ -42,17 +42,20 @@ message DescribleIndexResponse { common.Status status = 1; IndexStatus index_status =2; int64 indexID = 3; - repeated string index_file_paths=4; - uint64 enque_time = 5; - uint64 schedule_time = 6; - uint64 build_complete_time = 7; + int64 enque_time = 4; + int64 schedule_time = 5; + int64 build_complete_time = 6; } message IndexMeta { IndexStatus status =1; int64 indexID = 2; - BuildIndexRequest req = 3; + int64 enque_time = 3; + int64 schedule_time = 4; + int64 build_complete_time = 5; + BuildIndexRequest req = 6; + repeated string index_file_paths=7; } service IndexBuildService { diff --git a/internal/proto/indexbuilderpb/index_builder.pb.go b/internal/proto/indexbuilderpb/index_builder.pb.go index 18a860aded..a0940d57d8 100644 --- a/internal/proto/indexbuilderpb/index_builder.pb.go +++ b/internal/proto/indexbuilderpb/index_builder.pb.go @@ -295,10 +295,9 @@ type DescribleIndexResponse struct { Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` IndexStatus IndexStatus `protobuf:"varint,2,opt,name=index_status,json=indexStatus,proto3,enum=milvus.proto.service.IndexStatus" json:"index_status,omitempty"` IndexID int64 `protobuf:"varint,3,opt,name=indexID,proto3" json:"indexID,omitempty"` - IndexFilePaths []string `protobuf:"bytes,4,rep,name=index_file_paths,json=indexFilePaths,proto3" json:"index_file_paths,omitempty"` - EnqueTime uint64 `protobuf:"varint,5,opt,name=enque_time,json=enqueTime,proto3" json:"enque_time,omitempty"` - ScheduleTime uint64 `protobuf:"varint,6,opt,name=schedule_time,json=scheduleTime,proto3" json:"schedule_time,omitempty"` - BuildCompleteTime uint64 `protobuf:"varint,7,opt,name=build_complete_time,json=buildCompleteTime,proto3" json:"build_complete_time,omitempty"` + EnqueTime int64 `protobuf:"varint,4,opt,name=enque_time,json=enqueTime,proto3" json:"enque_time,omitempty"` + ScheduleTime int64 `protobuf:"varint,5,opt,name=schedule_time,json=scheduleTime,proto3" json:"schedule_time,omitempty"` + BuildCompleteTime int64 `protobuf:"varint,6,opt,name=build_complete_time,json=buildCompleteTime,proto3" json:"build_complete_time,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -350,28 +349,21 @@ func (m *DescribleIndexResponse) GetIndexID() int64 { return 0 } -func (m *DescribleIndexResponse) GetIndexFilePaths() []string { - if m != nil { - return m.IndexFilePaths - } - return nil -} - -func (m *DescribleIndexResponse) GetEnqueTime() uint64 { +func (m *DescribleIndexResponse) GetEnqueTime() int64 { if m != nil { return m.EnqueTime } return 0 } -func (m *DescribleIndexResponse) GetScheduleTime() uint64 { +func (m *DescribleIndexResponse) GetScheduleTime() int64 { if m != nil { return m.ScheduleTime } return 0 } -func (m *DescribleIndexResponse) GetBuildCompleteTime() uint64 { +func (m *DescribleIndexResponse) GetBuildCompleteTime() int64 { if m != nil { return m.BuildCompleteTime } @@ -381,7 +373,11 @@ func (m *DescribleIndexResponse) GetBuildCompleteTime() uint64 { type IndexMeta struct { Status IndexStatus `protobuf:"varint,1,opt,name=status,proto3,enum=milvus.proto.service.IndexStatus" json:"status,omitempty"` IndexID int64 `protobuf:"varint,2,opt,name=indexID,proto3" json:"indexID,omitempty"` - Req *BuildIndexRequest `protobuf:"bytes,3,opt,name=req,proto3" json:"req,omitempty"` + EnqueTime int64 `protobuf:"varint,3,opt,name=enque_time,json=enqueTime,proto3" json:"enque_time,omitempty"` + ScheduleTime int64 `protobuf:"varint,4,opt,name=schedule_time,json=scheduleTime,proto3" json:"schedule_time,omitempty"` + BuildCompleteTime int64 `protobuf:"varint,5,opt,name=build_complete_time,json=buildCompleteTime,proto3" json:"build_complete_time,omitempty"` + Req *BuildIndexRequest `protobuf:"bytes,6,opt,name=req,proto3" json:"req,omitempty"` + IndexFilePaths []string `protobuf:"bytes,7,rep,name=index_file_paths,json=indexFilePaths,proto3" json:"index_file_paths,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -426,6 +422,27 @@ func (m *IndexMeta) GetIndexID() int64 { return 0 } +func (m *IndexMeta) GetEnqueTime() int64 { + if m != nil { + return m.EnqueTime + } + return 0 +} + +func (m *IndexMeta) GetScheduleTime() int64 { + if m != nil { + return m.ScheduleTime + } + return 0 +} + +func (m *IndexMeta) GetBuildCompleteTime() int64 { + if m != nil { + return m.BuildCompleteTime + } + return 0 +} + func (m *IndexMeta) GetReq() *BuildIndexRequest { if m != nil { return m.Req @@ -433,6 +450,13 @@ func (m *IndexMeta) GetReq() *BuildIndexRequest { return nil } +func (m *IndexMeta) GetIndexFilePaths() []string { + if m != nil { + return m.IndexFilePaths + } + return nil +} + func init() { proto.RegisterEnum("milvus.proto.service.IndexStatus", IndexStatus_name, IndexStatus_value) proto.RegisterType((*BuildIndexRequest)(nil), "milvus.proto.service.BuildIndexRequest") @@ -447,46 +471,47 @@ func init() { func init() { proto.RegisterFile("index_builder.proto", fileDescriptor_c1d6a79d693ba681) } var fileDescriptor_c1d6a79d693ba681 = []byte{ - // 611 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xcb, 0x6e, 0xd3, 0x4c, - 0x14, 0xae, 0xe3, 0xfc, 0x6d, 0x73, 0x92, 0x56, 0xc9, 0xf4, 0x07, 0x99, 0x20, 0xa4, 0x10, 0x16, - 0x58, 0x5c, 0x62, 0x91, 0xb2, 0xe9, 0xb6, 0x4d, 0xda, 0x5a, 0x88, 0x34, 0xb2, 0x29, 0x0b, 0x36, - 0x91, 0x2f, 0x07, 0x32, 0xd2, 0xf8, 0x52, 0xcf, 0xb8, 0xa2, 0x7d, 0x0e, 0x76, 0xbc, 0x0a, 0x6f, - 0xc0, 0x2b, 0xf0, 0x30, 0xc8, 0xe3, 0x49, 0x69, 0x5a, 0x57, 0x8d, 0x84, 0x58, 0xfa, 0xcc, 0x77, - 0x99, 0xf9, 0xce, 0x39, 0x09, 0xec, 0xd0, 0x38, 0xc4, 0xaf, 0x33, 0x3f, 0xa7, 0x2c, 0xc4, 0x6c, - 0x90, 0x66, 0x89, 0x48, 0xc8, 0xff, 0x11, 0x65, 0xe7, 0x39, 0x2f, 0xbf, 0x06, 0x1c, 0xb3, 0x73, - 0x1a, 0x60, 0xb7, 0x15, 0x24, 0x51, 0x94, 0xc4, 0x65, 0xb5, 0xff, 0x43, 0x83, 0xce, 0x7e, 0xc1, - 0xb2, 0x0b, 0x01, 0x07, 0xcf, 0x72, 0xe4, 0x82, 0x3c, 0x01, 0x08, 0x3d, 0xe1, 0xcd, 0x52, 0x4f, - 0xcc, 0xb9, 0x51, 0xeb, 0xe9, 0x66, 0xc3, 0x69, 0x14, 0x95, 0x69, 0x51, 0x20, 0xfb, 0xd0, 0x14, - 0x17, 0x29, 0xce, 0x52, 0x2f, 0xf3, 0x22, 0x6e, 0xe8, 0x3d, 0xdd, 0x6c, 0x0e, 0x9f, 0x0e, 0x96, - 0xec, 0x94, 0xcb, 0x3b, 0xbc, 0xf8, 0xe8, 0xb1, 0x1c, 0xa7, 0x1e, 0xcd, 0x1c, 0x28, 0x58, 0x53, - 0x49, 0x22, 0x23, 0x68, 0x95, 0x77, 0x56, 0x22, 0xf5, 0x55, 0x45, 0x9a, 0x92, 0x56, 0xaa, 0xf4, - 0x03, 0x20, 0xd7, 0x6f, 0xcf, 0xd3, 0x24, 0xe6, 0x48, 0x76, 0x61, 0x9d, 0x0b, 0x4f, 0xe4, 0xdc, - 0xd0, 0x7a, 0x9a, 0xd9, 0x1c, 0x3e, 0xae, 0x54, 0x75, 0x25, 0xc4, 0x51, 0x50, 0x62, 0xc0, 0x86, - 0x54, 0xb6, 0x47, 0x46, 0xad, 0xa7, 0x99, 0xba, 0xb3, 0xf8, 0xec, 0xbf, 0x05, 0xe3, 0x08, 0x85, - 0xb4, 0x38, 0xa4, 0x0c, 0x65, 0x06, 0x8b, 0xa4, 0xae, 0xb1, 0xb4, 0x65, 0xd6, 0x37, 0x0d, 0x1e, - 0x55, 0xd0, 0xfe, 0xc9, 0x15, 0x89, 0x09, 0xed, 0x32, 0xcd, 0xcf, 0x94, 0xa1, 0x6a, 0x9b, 0x2e, - 0xdb, 0xb6, 0x4d, 0x97, 0x2e, 0xd0, 0x7f, 0x03, 0x0f, 0x46, 0xc8, 0x83, 0x8c, 0xfa, 0x0c, 0x97, - 0x7a, 0x7e, 0xf7, 0x4b, 0x7e, 0xd6, 0xe0, 0xe1, 0x4d, 0xce, 0xdf, 0x3c, 0xe3, 0xaa, 0xf5, 0x8a, - 0x5a, 0xbc, 0x65, 0xfb, 0x66, 0xeb, 0xd5, 0xb8, 0x0e, 0xa4, 0x9f, 0x12, 0x28, 0x5b, 0xef, 0xde, - 0x0a, 0x43, 0xbf, 0x3f, 0x8c, 0x7a, 0x55, 0x18, 0xc5, 0x9c, 0x63, 0x7c, 0x96, 0xe3, 0x4c, 0xd0, - 0x08, 0x8d, 0xff, 0x7a, 0x9a, 0x59, 0x77, 0x1a, 0xb2, 0xf2, 0x81, 0x46, 0x48, 0x9e, 0xc1, 0x16, - 0x0f, 0xe6, 0x18, 0xe6, 0x4c, 0x21, 0xd6, 0x25, 0xa2, 0xb5, 0x28, 0x4a, 0xd0, 0x00, 0x76, 0xe4, - 0xda, 0xcd, 0x82, 0x24, 0x4a, 0x19, 0x0a, 0x05, 0xdd, 0x90, 0xd0, 0x8e, 0x3c, 0x3a, 0x50, 0x27, - 0x05, 0xbe, 0xff, 0x5d, 0x83, 0x86, 0x7c, 0xd4, 0x7b, 0x14, 0x1e, 0xd9, 0x5b, 0x0a, 0x70, 0xa5, - 0x14, 0xee, 0x9f, 0x86, 0x3d, 0xd0, 0x33, 0x3c, 0x93, 0xb1, 0x34, 0x87, 0xcf, 0xab, 0x15, 0x6f, - 0x2d, 0xbd, 0x53, 0x70, 0x5e, 0x1c, 0x40, 0xf3, 0x9a, 0x17, 0xd9, 0x84, 0xfa, 0xe4, 0x64, 0x32, - 0x6e, 0xaf, 0x91, 0x16, 0x6c, 0x9e, 0x4e, 0x6c, 0xd7, 0x3d, 0x1d, 0x8f, 0xda, 0x1a, 0xd9, 0x06, - 0xb0, 0x27, 0x53, 0xe7, 0xe4, 0xc8, 0x19, 0xbb, 0x6e, 0xbb, 0x56, 0x9c, 0x1e, 0xda, 0x13, 0xdb, - 0x3d, 0x1e, 0x8f, 0xda, 0xfa, 0xf0, 0x57, 0x0d, 0x3a, 0x52, 0x45, 0x9a, 0xb8, 0xa5, 0x23, 0xf1, - 0x00, 0xfe, 0x98, 0x92, 0x55, 0xaf, 0xd5, 0x35, 0xef, 0x07, 0x96, 0xc3, 0xd8, 0x5f, 0x23, 0x0c, - 0xb6, 0xd4, 0xa0, 0x96, 0x73, 0x4a, 0x5e, 0x56, 0x93, 0x2b, 0x37, 0xa0, 0xfb, 0x6a, 0x35, 0xf0, - 0x95, 0xdb, 0x39, 0x74, 0x6e, 0x2d, 0x38, 0x19, 0x54, 0x8b, 0xdc, 0xf5, 0x03, 0xd2, 0xb5, 0x56, - 0xc6, 0x2f, 0x7c, 0xf7, 0x8f, 0x3f, 0x1d, 0x7e, 0xa1, 0x62, 0x9e, 0xfb, 0xc5, 0x7e, 0x59, 0x97, - 0x94, 0x31, 0x7a, 0x29, 0x30, 0x98, 0x5b, 0xa5, 0xd2, 0xeb, 0x90, 0x72, 0x91, 0x51, 0x3f, 0x17, - 0x18, 0x5a, 0x34, 0x16, 0x98, 0xc5, 0x1e, 0xb3, 0xa4, 0xbc, 0x25, 0x27, 0x44, 0xfd, 0x4b, 0xa4, - 0xbe, 0xbf, 0x2e, 0xab, 0xbb, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x6f, 0xad, 0xe3, 0x08, 0x3f, - 0x06, 0x00, 0x00, + // 626 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xdb, 0x6e, 0xd3, 0x4c, + 0x10, 0xae, 0xe3, 0xf4, 0x90, 0x49, 0x5a, 0x25, 0xdb, 0xff, 0x47, 0x26, 0xa8, 0x52, 0x08, 0x17, + 0x44, 0x1c, 0x12, 0x91, 0x72, 0xd3, 0xdb, 0x36, 0x69, 0x1b, 0x21, 0xd2, 0xc8, 0xa6, 0x5c, 0x70, + 0x13, 0xf9, 0x30, 0x90, 0x95, 0xd6, 0x87, 0x7a, 0xd7, 0x15, 0xed, 0x73, 0xf0, 0x02, 0x3c, 0x06, + 0x12, 0x8f, 0xc2, 0xc3, 0x20, 0xaf, 0x37, 0xa5, 0x6e, 0x5d, 0xc5, 0x12, 0xe2, 0xd2, 0xb3, 0xdf, + 0xf7, 0xcd, 0x7c, 0x33, 0x3b, 0x6b, 0xd8, 0xa5, 0x81, 0x87, 0x5f, 0xe7, 0x4e, 0x42, 0x99, 0x87, + 0x71, 0x3f, 0x8a, 0x43, 0x11, 0x92, 0xff, 0x7c, 0xca, 0x2e, 0x13, 0x9e, 0x7d, 0xf5, 0x39, 0xc6, + 0x97, 0xd4, 0xc5, 0x76, 0xc3, 0x0d, 0x7d, 0x3f, 0x0c, 0xb2, 0x68, 0xf7, 0xa7, 0x06, 0xad, 0xc3, + 0x94, 0x35, 0x49, 0x05, 0x4c, 0xbc, 0x48, 0x90, 0x0b, 0xb2, 0x07, 0xe0, 0xd9, 0xc2, 0x9e, 0x47, + 0xb6, 0x58, 0x70, 0xa3, 0xd2, 0xd1, 0x7b, 0x35, 0xb3, 0x96, 0x46, 0x66, 0x69, 0x80, 0x1c, 0x42, + 0x5d, 0x5c, 0x45, 0x38, 0x8f, 0xec, 0xd8, 0xf6, 0xb9, 0xa1, 0x77, 0xf4, 0x5e, 0x7d, 0xf8, 0xb4, + 0x9f, 0x4b, 0xa7, 0xb2, 0xbc, 0xc3, 0xab, 0x8f, 0x36, 0x4b, 0x70, 0x66, 0xd3, 0xd8, 0x84, 0x94, + 0x35, 0x93, 0x24, 0x32, 0x82, 0x46, 0x56, 0xb3, 0x12, 0xa9, 0x96, 0x15, 0xa9, 0x4b, 0x5a, 0xa6, + 0xd2, 0x75, 0x81, 0xdc, 0xae, 0x9e, 0x47, 0x61, 0xc0, 0x91, 0xec, 0xc3, 0x06, 0x17, 0xb6, 0x48, + 0xb8, 0xa1, 0x75, 0xb4, 0x5e, 0x7d, 0xf8, 0xa4, 0x50, 0xd5, 0x92, 0x10, 0x53, 0x41, 0x89, 0x01, + 0x9b, 0x52, 0x79, 0x32, 0x32, 0x2a, 0x1d, 0xad, 0xa7, 0x9b, 0xcb, 0xcf, 0xee, 0x5b, 0x30, 0x4e, + 0x50, 0xc8, 0x14, 0xc7, 0x94, 0xa1, 0xec, 0xc1, 0xb2, 0x53, 0xb7, 0x58, 0x5a, 0x9e, 0xf5, 0x4d, + 0x83, 0xc7, 0x05, 0xb4, 0x7f, 0x52, 0x22, 0xe9, 0x41, 0x33, 0xeb, 0xe6, 0x67, 0xca, 0x50, 0x8d, + 0x4d, 0x97, 0x63, 0xdb, 0xa1, 0xb9, 0x02, 0xba, 0x6f, 0xe0, 0xff, 0x11, 0x72, 0x37, 0xa6, 0x0e, + 0xc3, 0xdc, 0xcc, 0x1f, 0x76, 0xf2, 0xbd, 0x02, 0x8f, 0xee, 0x72, 0xfe, 0xc6, 0xc6, 0xcd, 0xe8, + 0x15, 0x35, 0xf5, 0xb2, 0x73, 0x77, 0xf4, 0xea, 0xba, 0xf6, 0x65, 0x3e, 0x25, 0x90, 0x8d, 0xde, + 0xba, 0xd7, 0x0c, 0x3d, 0xdf, 0x8c, 0x3d, 0x00, 0x0c, 0x2e, 0x12, 0x9c, 0x0b, 0xea, 0xa3, 0x51, + 0x95, 0x87, 0x35, 0x19, 0xf9, 0x40, 0x7d, 0x24, 0xcf, 0x60, 0x9b, 0xbb, 0x0b, 0xf4, 0x12, 0xa6, + 0x10, 0xeb, 0x12, 0xd1, 0x58, 0x06, 0x25, 0xa8, 0x0f, 0xbb, 0x72, 0x99, 0xe6, 0x6e, 0xe8, 0x47, + 0x0c, 0x85, 0x82, 0x6e, 0x48, 0x68, 0x4b, 0x1e, 0x1d, 0xa9, 0x93, 0x14, 0xdf, 0xfd, 0x51, 0x81, + 0x9a, 0x2c, 0xf5, 0x3d, 0x0a, 0x9b, 0x1c, 0xe4, 0xda, 0x52, 0xca, 0xdb, 0xea, 0x19, 0xe7, 0x6d, + 0xe9, 0x2b, 0x6d, 0x55, 0xcb, 0xdb, 0x5a, 0x7f, 0xc0, 0x16, 0x39, 0x00, 0x3d, 0xc6, 0x0b, 0x69, + 0xbb, 0x3e, 0x7c, 0x5e, 0xec, 0xe2, 0xde, 0xf3, 0x61, 0xa6, 0x9c, 0xc2, 0x2b, 0xb9, 0x59, 0x74, + 0x25, 0x5f, 0x1c, 0x41, 0xfd, 0x56, 0x27, 0xc8, 0x16, 0x54, 0xa7, 0x67, 0xd3, 0x71, 0x73, 0x8d, + 0x34, 0x60, 0xeb, 0x7c, 0x3a, 0xb1, 0xac, 0xf3, 0xf1, 0xa8, 0xa9, 0x91, 0x1d, 0x80, 0xc9, 0x74, + 0x66, 0x9e, 0x9d, 0x98, 0x63, 0xcb, 0x6a, 0x56, 0xd2, 0xd3, 0xe3, 0xc9, 0x74, 0x62, 0x9d, 0x8e, + 0x47, 0x4d, 0x7d, 0xf8, 0xab, 0x02, 0x2d, 0xa9, 0x22, 0xcb, 0xb1, 0xb2, 0xda, 0x88, 0x0d, 0xf0, + 0xa7, 0x3c, 0x52, 0xd6, 0x40, 0xbb, 0xb7, 0x1a, 0x98, 0x2d, 0x40, 0x77, 0x8d, 0x30, 0xd8, 0x56, + 0xcb, 0x91, 0xed, 0x06, 0x79, 0x59, 0x4c, 0x2e, 0xdc, 0xba, 0xf6, 0xab, 0x72, 0xe0, 0x9b, 0x6c, + 0x97, 0xd0, 0xba, 0xf7, 0xa8, 0x90, 0x7e, 0xb1, 0xc8, 0x43, 0x8f, 0x56, 0x7b, 0x50, 0x1a, 0xbf, + 0xcc, 0x7b, 0x78, 0xfa, 0xe9, 0xf8, 0x0b, 0x15, 0x8b, 0xc4, 0x49, 0x77, 0x7a, 0x70, 0x4d, 0x19, + 0xa3, 0xd7, 0x02, 0xdd, 0xc5, 0x20, 0x53, 0x7a, 0xed, 0x51, 0x2e, 0x62, 0xea, 0x24, 0x02, 0xbd, + 0x01, 0x0d, 0x04, 0xc6, 0x81, 0xcd, 0x06, 0x52, 0x7e, 0x20, 0xa7, 0xad, 0xfe, 0x4c, 0x91, 0xe3, + 0x6c, 0xc8, 0xe8, 0xfe, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x05, 0xa2, 0xa4, 0x63, 0xb3, 0x06, + 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/writenode/client/client.go b/internal/writenode/client/client.go index dd6e965c72..bfd07a905a 100644 --- a/internal/writenode/client/client.go +++ b/internal/writenode/client/client.go @@ -26,7 +26,7 @@ func (c *Client) DescribeSegment(semgentID UniqueID) (*SegmentDescription, error return &SegmentDescription{}, nil } -func (c *Client) GetInsertBinlogPaths(semgentID UniqueID) (map[int32]string, error) { +func (c *Client) GetInsertBinlogPaths(semgentID UniqueID) (map[int32][]string, error) { // query etcd return nil, nil }