From 342200ce1333df5c4b7c8300dc6981e07caec5d4 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 9 Dec 2021 14:19:40 +0800 Subject: [PATCH] Estimate the memory size of the index before building the index (#12973) Signed-off-by: Cai.Zhang --- .../indexnode/client/client_test.go | 6 +- .../distributed/indexnode/service_test.go | 8 +- .../distributed/rootcoord/service_test.go | 2 +- internal/indexcoord/index_coord.go | 19 ++- internal/indexcoord/index_coord_test.go | 12 ++ internal/indexcoord/meta_table_test.go | 32 +++- internal/indexcoord/node_manager.go | 48 +++++- internal/indexcoord/peek_client_policy.go | 37 +++++ .../indexcoord/peek_client_policy_test.go | 39 +++++ internal/indexcoord/priority_queue.go | 13 +- internal/indexcoord/priority_queue_test.go | 24 ++- internal/indexcoord/util.go | 58 +++++++ internal/indexcoord/util_test.go | 84 ++++++++++ internal/indexnode/indexnode.go | 9 +- internal/indexnode/indexnode_mock.go | 66 +++++++- internal/indexnode/indexnode_mock_test.go | 13 +- internal/indexnode/task.go | 19 ++- internal/proto/index_coord.proto | 4 + internal/proto/indexpb/index_coord.pb.go | 155 +++++++++++------- internal/rootcoord/root_coord.go | 8 +- internal/rootcoord/root_coord_test.go | 6 +- 21 files changed, 540 insertions(+), 122 deletions(-) create mode 100644 internal/indexcoord/peek_client_policy.go create mode 100644 internal/indexcoord/peek_client_policy_test.go create mode 100644 internal/indexcoord/util.go create mode 100644 internal/indexcoord/util_test.go diff --git a/internal/distributed/indexnode/client/client_test.go b/internal/distributed/indexnode/client/client_test.go index b8f39cc2e6..c89f525d62 100644 --- a/internal/distributed/indexnode/client/client_test.go +++ b/internal/distributed/indexnode/client/client_test.go @@ -21,6 +21,8 @@ import ( "errors" "testing" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/mock" "google.golang.org/grpc" @@ -29,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/stretchr/testify/assert" ) @@ -169,7 +170,8 @@ func TestIndexNodeClient(t *testing.T) { }) t.Run("GetMetrics", func(t *testing.T) { - req := &milvuspb.GetMetricsRequest{} + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + assert.Nil(t, err) resp, err := inc.GetMetrics(ctx, req) assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) diff --git a/internal/distributed/indexnode/service_test.go b/internal/distributed/indexnode/service_test.go index d121676be6..bed22c50e6 100644 --- a/internal/distributed/indexnode/service_test.go +++ b/internal/distributed/indexnode/service_test.go @@ -20,12 +20,13 @@ import ( "context" "testing" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/indexnode" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/stretchr/testify/assert" ) @@ -75,9 +76,8 @@ func TestIndexNodeServer(t *testing.T) { }) t.Run("GetMetrics", func(t *testing.T) { - req := &milvuspb.GetMetricsRequest{ - Request: "", - } + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + assert.Nil(t, err) resp, err := server.GetMetrics(ctx, req) assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index a762854fed..e238cc06ad 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -181,7 +181,7 @@ func TestGrpcService(t *testing.T) { var binlogLock sync.Mutex binlogPathArray := make([]string, 0, 16) - core.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { + core.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error) { binlogLock.Lock() defer binlogLock.Unlock() binlogPathArray = append(binlogPathArray, binlog...) diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 3f04a46f29..a8ffe472c1 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -354,13 +354,6 @@ func (i *IndexCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.String // the task is recorded in Meta. The background process assignTaskLoop will find this task and assign it to IndexNode for // execution. func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { - log.Debug("IndexCoord building index ...", - zap.Int64("IndexBuildID", req.IndexBuildID), - zap.String("IndexName = ", req.IndexName), - zap.Int64("IndexID = ", req.IndexID), - zap.Strings("DataPath = ", req.DataPaths), - zap.Any("TypeParams", req.TypeParams), - zap.Any("IndexParams", req.IndexParams)) if !i.isHealthy() { errMsg := "IndexCoord is not healthy" err := errors.New(errMsg) @@ -372,6 +365,15 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ }, }, err } + log.Debug("IndexCoord building index ...", + zap.Int64("IndexBuildID", req.IndexBuildID), + zap.String("IndexName = ", req.IndexName), + zap.Int64("IndexID = ", req.IndexID), + zap.Strings("DataPath = ", req.DataPaths), + zap.Any("TypeParams", req.TypeParams), + zap.Any("IndexParams", req.IndexParams), + zap.Int64("numRow", req.NumRows), + zap.Any("field type", req.FieldSchema.DataType)) sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "IndexCoord-BuildIndex") defer sp.Finish() hasIndex, indexBuildID := i.metaTable.HasSameReq(req) @@ -837,7 +839,8 @@ func (i *IndexCoord) assignTaskLoop() { continue } log.Debug("The version of the task has been updated", zap.Int64("indexBuildID", indexBuildID)) - nodeID, builderClient := i.nodeManager.PeekClient() + + nodeID, builderClient := i.nodeManager.PeekClient(meta) if builderClient == nil { log.Warn("IndexCoord assignmentTasksLoop can not find available IndexNode") break diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index f01093a427..88b948fe95 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/proto/milvuspb" @@ -97,6 +99,16 @@ func TestIndexCoord(t *testing.T) { req := &indexpb.BuildIndexRequest{ IndexID: indexID, DataPaths: []string{"DataPath-1", "DataPath-2"}, + NumRows: 0, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, + FieldSchema: &schemapb.FieldSchema{ + DataType: schemapb.DataType_FloatVector, + }, } resp, err := ic.BuildIndex(ctx, req) assert.Nil(t, err) diff --git a/internal/indexcoord/meta_table_test.go b/internal/indexcoord/meta_table_test.go index b0f48835e0..a3a758cbaa 100644 --- a/internal/indexcoord/meta_table_test.go +++ b/internal/indexcoord/meta_table_test.go @@ -20,6 +20,8 @@ import ( "strconv" "testing" + "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/golang/protobuf/proto" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -37,8 +39,12 @@ func TestMetaTable(t *testing.T) { IndexName: "test_index", IndexID: 0, DataPaths: []string{"DataPath-1-1", "DataPath-1-2"}, - TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}}, + TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}}, IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}}, + NumRows: 100, + FieldSchema: &schemapb.FieldSchema{ + DataType: schemapb.DataType_FloatVector, + }, } indexMeta1 := &indexpb.IndexMeta{ IndexBuildID: 1, @@ -170,8 +176,12 @@ func TestMetaTable(t *testing.T) { IndexName: "test_index", IndexID: 2, DataPaths: []string{"DataPath-1-1", "DataPath-1-2"}, - TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}}, + TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}}, IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}}, + NumRows: 100, + FieldSchema: &schemapb.FieldSchema{ + DataType: schemapb.DataType_FloatVector, + }, } err = metaTable.AddIndex(6, req2) @@ -182,8 +192,12 @@ func TestMetaTable(t *testing.T) { IndexName: "test_index", IndexID: 3, DataPaths: []string{"DataPath-1-1", "DataPath-1-2"}, - TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}}, + TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "dim", Value: "256"}}, IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}}, + NumRows: 10, + FieldSchema: &schemapb.FieldSchema{ + DataType: schemapb.DataType_FloatVector, + }, } has, err := metaTable.HasSameReq(req3) @@ -246,8 +260,12 @@ func TestMetaTable(t *testing.T) { IndexName: "test_index", IndexID: 4, DataPaths: []string{"DataPath-1-1", "DataPath-1-2"}, - TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}}, + TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}}, IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}}, + NumRows: 10, + FieldSchema: &schemapb.FieldSchema{ + DataType: schemapb.DataType_FloatVector, + }, } err = metaTable.AddIndex(7, req4) assert.Nil(t, err) @@ -269,8 +287,12 @@ func TestMetaTable(t *testing.T) { IndexName: "test_index", IndexID: 5, DataPaths: []string{"DataPath-1-1", "DataPath-1-2"}, - TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}}, + TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}}, IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}}, + NumRows: 10, + FieldSchema: &schemapb.FieldSchema{ + DataType: schemapb.DataType_FloatVector, + }, } err = metaTable.AddIndex(req5.IndexBuildID, req5) diff --git a/internal/indexcoord/node_manager.go b/internal/indexcoord/node_manager.go index 83dd659eb8..721759c12c 100644 --- a/internal/indexcoord/node_manager.go +++ b/internal/indexcoord/node_manager.go @@ -20,6 +20,8 @@ import ( "context" "sync" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/proto/milvuspb" grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" @@ -40,12 +42,31 @@ type NodeManager struct { func NewNodeManager() *NodeManager { return &NodeManager{ nodeClients: make(map[UniqueID]types.IndexNode), - pq: &PriorityQueue{}, - lock: sync.RWMutex{}, + pq: &PriorityQueue{ + policy: PeekClientV1, + }, + lock: sync.RWMutex{}, } } -func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) { +func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) error { + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + if err != nil { + log.Error("create metrics request failed", zap.Error(err)) + return err + } + metrics, err := client.GetMetrics(context.Background(), req) + if err != nil { + log.Error("get indexnode metrics failed", zap.Error(err)) + return err + } + + infos := &metricsinfo.IndexNodeInfos{} + err = metricsinfo.UnmarshalComponentInfos(metrics.Response, infos) + if err != nil { + log.Error("get indexnode metrics info failed", zap.Error(err)) + return err + } nm.lock.Lock() defer nm.lock.Unlock() @@ -55,9 +76,11 @@ func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) { key: nodeID, priority: 0, weight: 0, + totalMem: infos.HardwareInfos.Memory, } nm.nodeClients[nodeID] = client nm.pq.Push(item) + return nil } // RemoveNode removes the unused client of IndexNode. @@ -88,16 +111,27 @@ func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error { log.Error("IndexCoord NodeManager", zap.Any("Add node err", err)) return err } - nm.setClient(nodeID, nodeClient) - return nil + return nm.setClient(nodeID, nodeClient) } // PeekClient peeks the client with the least load. -func (nm *NodeManager) PeekClient() (UniqueID, types.IndexNode) { +func (nm *NodeManager) PeekClient(meta Meta) (UniqueID, types.IndexNode) { nm.lock.Lock() defer nm.lock.Unlock() - nodeID := nm.pq.Peek() + log.Debug("IndexCoord NodeManager PeekClient") + + dim, err := getDimension(meta.indexMeta.Req) + if err != nil { + log.Error(err.Error()) + return UniqueID(-1), nil + } + indexSize, err := estimateIndexSize(dim, meta.indexMeta.Req.NumRows, meta.indexMeta.Req.FieldSchema.DataType) + if err != nil { + log.Warn(err.Error()) + return UniqueID(-1), nil + } + nodeID := nm.pq.Peek(indexSize, meta.indexMeta.Req.IndexParams, meta.indexMeta.Req.TypeParams) client, ok := nm.nodeClients[nodeID] if !ok { log.Error("IndexCoord NodeManager PeekClient", zap.Any("There is no IndexNode client corresponding to NodeID", nodeID)) diff --git a/internal/indexcoord/peek_client_policy.go b/internal/indexcoord/peek_client_policy.go new file mode 100644 index 0000000000..062efa90ff --- /dev/null +++ b/internal/indexcoord/peek_client_policy.go @@ -0,0 +1,37 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexcoord + +import "github.com/milvus-io/milvus/internal/proto/commonpb" + +type PeekClientPolicy func(memorySize uint64, indexParams []*commonpb.KeyValuePair, + typeParams []*commonpb.KeyValuePair, pq *PriorityQueue) UniqueID + +func PeekClientV0(memorySize uint64, indexParams []*commonpb.KeyValuePair, + typeParams []*commonpb.KeyValuePair, pq *PriorityQueue) UniqueID { + return pq.items[0].key +} + +func PeekClientV1(memorySize uint64, indexParams []*commonpb.KeyValuePair, + typeParams []*commonpb.KeyValuePair, pq *PriorityQueue) UniqueID { + for i := range pq.items { + if pq.items[i].totalMem > memorySize { + return pq.items[i].key + } + } + return UniqueID(-1) +} diff --git a/internal/indexcoord/peek_client_policy_test.go b/internal/indexcoord/peek_client_policy_test.go new file mode 100644 index 0000000000..2a52b3fce5 --- /dev/null +++ b/internal/indexcoord/peek_client_policy_test.go @@ -0,0 +1,39 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexcoord + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/stretchr/testify/assert" +) + +func TestPeekClientV0(t *testing.T) { + pq := newPriorityQueue() + key := PeekClientV0(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}, pq) + assert.Equal(t, UniqueID(0), key) +} + +func TestPeekClientV1(t *testing.T) { + pq := newPriorityQueue() + key := PeekClientV1(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}, pq) + assert.Equal(t, UniqueID(0), key) + + key2 := PeekClientV1(10000, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}, pq) + assert.Equal(t, UniqueID(-1), key2) +} diff --git a/internal/indexcoord/priority_queue.go b/internal/indexcoord/priority_queue.go index 100191e39d..9c349ef1d1 100644 --- a/internal/indexcoord/priority_queue.go +++ b/internal/indexcoord/priority_queue.go @@ -19,6 +19,8 @@ package indexcoord import ( "container/heap" "sync" + + "github.com/milvus-io/milvus/internal/proto/commonpb" ) // PQItem is something we manage in a priority queue. @@ -30,12 +32,15 @@ type PQItem struct { weight int // The weight of the item in the queue. // When the priority is the same, a smaller weight is more preferred. index int // The index of the item in the heap. + + totalMem uint64 // The total memory of the IndexNode. } // PriorityQueue implements heap.Interface and holds Items. type PriorityQueue struct { - items []*PQItem - lock sync.RWMutex + items []*PQItem + lock sync.RWMutex + policy PeekClientPolicy } // Len is the length of the priority queue. @@ -139,14 +144,14 @@ func (pq *PriorityQueue) Remove(key UniqueID) { } // Peek picks an key with the lowest load. -func (pq *PriorityQueue) Peek() UniqueID { +func (pq *PriorityQueue) Peek(memorySize uint64, indexParams []*commonpb.KeyValuePair, typeParams []*commonpb.KeyValuePair) UniqueID { pq.lock.RLock() defer pq.lock.RUnlock() if pq.Len() == 0 { return UniqueID(-1) } - return pq.items[0].key + return pq.policy(memorySize, indexParams, typeParams, pq) } // PeekAll return the key of all the items. diff --git a/internal/indexcoord/priority_queue_test.go b/internal/indexcoord/priority_queue_test.go index 93d83eb894..1fdb97e54a 100644 --- a/internal/indexcoord/priority_queue_test.go +++ b/internal/indexcoord/priority_queue_test.go @@ -20,18 +20,23 @@ import ( "container/heap" "testing" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/stretchr/testify/assert" ) const QueueLen = 10 func newPriorityQueue() *PriorityQueue { - ret := &PriorityQueue{} + ret := &PriorityQueue{ + policy: PeekClientV0, + } for i := 0; i < QueueLen; i++ { item := &PQItem{ key: UniqueID(i), priority: i, index: i, + totalMem: 1000, } ret.items = append(ret.items, item) } @@ -76,7 +81,7 @@ func TestPriorityQueue_UpdatePriority(t *testing.T) { pq := newPriorityQueue() key := UniqueID(pq.Len() / 2) pq.UpdatePriority(key, -pq.Len()) - peekKey := pq.Peek() + peekKey := pq.Peek(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}) assert.Equal(t, key, peekKey) } @@ -84,37 +89,40 @@ func TestPriorityQueue_IncPriority(t *testing.T) { pq := newPriorityQueue() key := UniqueID(pq.Len() / 2) pq.IncPriority(key, -pq.Len()) - peekKey := pq.Peek() + peekKey := pq.Peek(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}) assert.Equal(t, key, peekKey) } func TestPriorityQueue(t *testing.T) { - ret := &PriorityQueue{} + ret := &PriorityQueue{ + policy: PeekClientV0, + } for i := 0; i < 4; i++ { item := &PQItem{ key: UniqueID(i), priority: 0, index: i, + totalMem: 1000, } ret.items = append(ret.items, item) } heap.Init(ret) - peeKey1 := ret.Peek() + peeKey1 := ret.Peek(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}) assert.Equal(t, int64(0), peeKey1) ret.IncPriority(peeKey1, 1) - peeKey2 := ret.Peek() + peeKey2 := ret.Peek(100, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}) assert.Equal(t, int64(1), peeKey2) ret.IncPriority(peeKey2, 1) ret.IncPriority(peeKey1, -1) ret.IncPriority(peeKey2, -1) - peeKey1 = ret.Peek() + peeKey1 = ret.Peek(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}) assert.Equal(t, int64(3), peeKey1) ret.IncPriority(peeKey1, 1) - peeKey2 = ret.Peek() + peeKey2 = ret.Peek(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}) assert.Equal(t, int64(2), peeKey2) } diff --git a/internal/indexcoord/util.go b/internal/indexcoord/util.go new file mode 100644 index 0000000000..97a57d2fd9 --- /dev/null +++ b/internal/indexcoord/util.go @@ -0,0 +1,58 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexcoord + +import ( + "errors" + "strconv" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" +) + +func getDimension(req *indexpb.BuildIndexRequest) (int64, error) { + for _, kvPair := range req.GetTypeParams() { + key, value := kvPair.GetKey(), kvPair.GetValue() + if key == "dim" { + dim, err := strconv.ParseInt(value, 10, 64) + if err != nil { + errMsg := "dimension is invalid" + log.Error(errMsg) + return 0, errors.New(errMsg) + } + return dim, nil + } + } + errMsg := "dimension is not in type params" + log.Error(errMsg) + return 0, errors.New(errMsg) +} + +func estimateIndexSize(dim int64, numRows int64, dataType schemapb.DataType) (uint64, error) { + if dataType == schemapb.DataType_FloatVector { + return uint64(dim) * uint64(numRows) * 4, nil + } + + if dataType == schemapb.DataType_BinaryVector { + return uint64(dim) / 8 * uint64(numRows), nil + } + + errMsg := "the field to build index must be a vector field" + log.Error(errMsg) + return 0, errors.New(errMsg) +} diff --git a/internal/indexcoord/util_test.go b/internal/indexcoord/util_test.go new file mode 100644 index 0000000000..b5e297283f --- /dev/null +++ b/internal/indexcoord/util_test.go @@ -0,0 +1,84 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexcoord + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/stretchr/testify/assert" +) + +func Test_getDimension(t *testing.T) { + req := &indexpb.BuildIndexRequest{ + IndexBuildID: UniqueID(0), + IndexID: UniqueID(1), + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, + } + dim, err := getDimension(req) + assert.Equal(t, int64(128), dim) + assert.Nil(t, err) + + req2 := &indexpb.BuildIndexRequest{ + IndexBuildID: UniqueID(0), + IndexID: UniqueID(1), + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "one", + }, + }, + } + dim, err = getDimension(req2) + assert.Error(t, err) + assert.Equal(t, int64(0), dim) + + req3 := &indexpb.BuildIndexRequest{ + IndexBuildID: UniqueID(0), + IndexID: UniqueID(1), + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "TypeParam-Key-1", + Value: "TypeParam-Value-1", + }, + }, + } + dim, err = getDimension(req3) + assert.Error(t, err) + assert.Equal(t, int64(0), dim) +} + +func Test_estimateIndexSize(t *testing.T) { + memorySize, err := estimateIndexSize(10, 100, schemapb.DataType_FloatVector) + assert.Nil(t, err) + assert.Equal(t, uint64(4000), memorySize) + + memorySize, err = estimateIndexSize(16, 100, schemapb.DataType_BinaryVector) + assert.Nil(t, err) + assert.Equal(t, uint64(200), memorySize) + + memorySize, err = estimateIndexSize(10, 100, schemapb.DataType_Float) + assert.Error(t, err) + assert.Equal(t, uint64(0), memorySize) +} diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 4694157987..da61edd2e0 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -269,10 +269,11 @@ func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateInde ctx: ctx2, done: make(chan error), }, - req: request, - kv: i.kv, - etcdKV: i.etcdKV, - nodeID: Params.NodeID, + req: request, + kv: i.kv, + etcdKV: i.etcdKV, + nodeID: Params.NodeID, + serializedSize: 0, } ret := &commonpb.Status{ diff --git a/internal/indexnode/indexnode_mock.go b/internal/indexnode/indexnode_mock.go index f418205725..32d97331f1 100644 --- a/internal/indexnode/indexnode_mock.go +++ b/internal/indexnode/indexnode_mock.go @@ -289,12 +289,74 @@ func (inm *Mock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest }, nil } + metricType, _ := metricsinfo.ParseMetricType(req.Request) + + if metricType == metricsinfo.SystemInfoMetrics { + metrics, err := getMockSystemInfoMetrics(ctx, req, inm) + + log.Debug("IndexNode.GetMetrics", + zap.Int64("node_id", Params.NodeID), + zap.String("req", req.Request), + zap.String("metric_type", metricType), + zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large + zap.Error(err)) + + return metrics, nil + } + + log.Warn("IndexNode.GetMetrics failed, request metric type is not implemented yet", + zap.Int64("node_id", Params.NodeID), + zap.String("req", req.Request), + zap.String("metric_type", metricType)) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: metricsinfo.MsgUnimplementedMetric, + }, + Response: "", + }, nil +} + +func getMockSystemInfoMetrics( + ctx context.Context, + req *milvuspb.GetMetricsRequest, + node *Mock, +) (*milvuspb.GetMetricsResponse, error) { + // TODO(dragondriver): add more metrics + nodeInfos := metricsinfo.IndexNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, Params.NodeID), + HardwareInfos: metricsinfo.HardwareMetrics{ + CPUCoreCount: metricsinfo.GetCPUCoreCount(false), + CPUCoreUsage: metricsinfo.GetCPUUsage(), + Memory: metricsinfo.GetMemoryCount(), + MemoryUsage: metricsinfo.GetUsedMemoryCount(), + Disk: metricsinfo.GetDiskCount(), + DiskUsage: metricsinfo.GetDiskUsage(), + }, + SystemInfo: metricsinfo.DeployMetrics{}, + CreatedTime: Params.CreatedTime.String(), + UpdatedTime: Params.UpdatedTime.String(), + Type: typeutil.IndexNodeRole, + }, + SystemConfigurations: metricsinfo.IndexNodeConfiguration{ + MinioBucketName: Params.MinioBucketName, + + SimdType: Params.SimdType, + }, + } + + metricsinfo.FillDeployMetricsWithEnv(&nodeInfos.SystemInfo) + + resp, _ := metricsinfo.MarshalComponentInfos(nodeInfos) + return &milvuspb.GetMetricsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Response: "", - ComponentName: "IndexNode", + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, Params.NodeID), }, nil } diff --git a/internal/indexnode/indexnode_mock_test.go b/internal/indexnode/indexnode_mock_test.go index 3ab4a8e0a5..9cb312d683 100644 --- a/internal/indexnode/indexnode_mock_test.go +++ b/internal/indexnode/indexnode_mock_test.go @@ -21,6 +21,8 @@ import ( "strconv" "testing" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -73,12 +75,17 @@ func TestIndexNodeMock(t *testing.T) { }) t.Run("GetMetrics", func(t *testing.T) { - req := &milvuspb.GetMetricsRequest{ - Request: "", - } + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + assert.Nil(t, err) resp, err := inm.GetMetrics(ctx, req) assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + + req2, err := metricsinfo.ConstructRequestByMetricType("IndexNode") + assert.Nil(t, err) + resp2, err := inm.GetMetrics(ctx, req2) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp2.Status.ErrorCode) }) err = inm.Stop() diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 73d1b162ff..f1a304ab91 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -102,12 +102,13 @@ func (bt *BaseTask) Notify(err error) { // IndexBuildTask is used to record the information of the index tasks. type IndexBuildTask struct { BaseTask - index Index - kv kv.BaseKV - etcdKV *etcdkv.EtcdKV - savePaths []string - req *indexpb.CreateIndexRequest - nodeID UniqueID + index Index + kv kv.BaseKV + etcdKV *etcdkv.EtcdKV + savePaths []string + req *indexpb.CreateIndexRequest + nodeID UniqueID + serializedSize uint64 } // Ctx is the context of index tasks. @@ -182,6 +183,7 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error { } indexMeta.IndexFilePaths = it.savePaths indexMeta.State = commonpb.IndexState_Finished + indexMeta.SerializeSize = it.serializedSize // Under normal circumstances, it.err and it.internalErr will not be non-nil at the same time, but for the sake of insurance, the else judgment is added. if it.err != nil { log.Error("IndexNode CreateIndex failed and can not be retried", zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Any("err", it.err)) @@ -407,6 +409,11 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { return err } tr.Record("serialize index codec done") + it.serializedSize = 0 + for i := range serializedIndexBlobs { + it.serializedSize += uint64(len(serializedIndexBlobs[i].Value)) + } + log.Debug("serialize index codec done", zap.Uint64("serialized index size", it.serializedSize)) getSavePathByKey := func(key string) string { diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 14237fb6d0..3496901c04 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -7,6 +7,7 @@ option go_package = "github.com/milvus-io/milvus/internal/proto/indexpb"; import "common.proto"; import "internal.proto"; import "milvus.proto"; +import "schema.proto"; service IndexCoord { rpc GetComponentStates(internal.GetComponentStatesRequest) returns (internal.ComponentStates) {} @@ -77,6 +78,8 @@ message BuildIndexRequest { repeated string data_paths = 5; repeated common.KeyValuePair type_params = 6; repeated common.KeyValuePair index_params = 7; + int64 num_rows = 8; + schema.FieldSchema field_schema = 9; } message BuildIndexResponse { @@ -109,6 +112,7 @@ message IndexMeta { int64 nodeID = 7; int64 version = 8; bool recycled = 9; + uint64 serialize_size = 10; } message DropIndexRequest { diff --git a/internal/proto/indexpb/index_coord.pb.go b/internal/proto/indexpb/index_coord.pb.go index b93304240f..0257a4a239 100644 --- a/internal/proto/indexpb/index_coord.pb.go +++ b/internal/proto/indexpb/index_coord.pb.go @@ -10,6 +10,7 @@ import ( commonpb "github.com/milvus-io/milvus/internal/proto/commonpb" internalpb "github.com/milvus-io/milvus/internal/proto/internalpb" milvuspb "github.com/milvus-io/milvus/internal/proto/milvuspb" + schemapb "github.com/milvus-io/milvus/internal/proto/schemapb" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -388,6 +389,8 @@ type BuildIndexRequest struct { DataPaths []string `protobuf:"bytes,5,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"` TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,6,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"` IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,7,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"` + NumRows int64 `protobuf:"varint,8,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"` + FieldSchema *schemapb.FieldSchema `protobuf:"bytes,9,opt,name=field_schema,json=fieldSchema,proto3" json:"field_schema,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -460,6 +463,20 @@ func (m *BuildIndexRequest) GetIndexParams() []*commonpb.KeyValuePair { return nil } +func (m *BuildIndexRequest) GetNumRows() int64 { + if m != nil { + return m.NumRows + } + return 0 +} + +func (m *BuildIndexRequest) GetFieldSchema() *schemapb.FieldSchema { + if m != nil { + return m.FieldSchema + } + return nil +} + type BuildIndexResponse struct { Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` IndexBuildID int64 `protobuf:"varint,2,opt,name=indexBuildID,proto3" json:"indexBuildID,omitempty"` @@ -658,6 +675,7 @@ type IndexMeta struct { NodeID int64 `protobuf:"varint,7,opt,name=nodeID,proto3" json:"nodeID,omitempty"` Version int64 `protobuf:"varint,8,opt,name=version,proto3" json:"version,omitempty"` Recycled bool `protobuf:"varint,9,opt,name=recycled,proto3" json:"recycled,omitempty"` + SerializeSize uint64 `protobuf:"varint,10,opt,name=serialize_size,json=serializeSize,proto3" json:"serialize_size,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -751,6 +769,13 @@ func (m *IndexMeta) GetRecycled() bool { return false } +func (m *IndexMeta) GetSerializeSize() uint64 { + if m != nil { + return m.SerializeSize + } + return 0 +} + type DropIndexRequest struct { IndexID int64 `protobuf:"varint,1,opt,name=indexID,proto3" json:"indexID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -809,68 +834,74 @@ func init() { func init() { proto.RegisterFile("index_coord.proto", fileDescriptor_f9e019eb3fda53c2) } var fileDescriptor_f9e019eb3fda53c2 = []byte{ - // 975 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0x5f, 0x6f, 0x1b, 0x45, - 0x10, 0xcf, 0xf9, 0x12, 0x3b, 0x1e, 0x87, 0xa8, 0x59, 0x4a, 0x75, 0xb8, 0x54, 0x75, 0x8e, 0x52, - 0x0c, 0x6a, 0x9d, 0xca, 0xa5, 0xf0, 0x84, 0x04, 0x89, 0x45, 0x64, 0xa1, 0x54, 0xd1, 0x36, 0xe2, - 0x01, 0x09, 0x59, 0x1b, 0xdf, 0x24, 0x59, 0xf5, 0xfe, 0xe5, 0x76, 0x5d, 0x91, 0x77, 0xde, 0x79, - 0x2b, 0xe2, 0x93, 0xf0, 0x39, 0xfa, 0xcc, 0x97, 0x41, 0xbb, 0xb7, 0x77, 0xb9, 0x3b, 0x9f, 0x53, - 0x87, 0x50, 0x78, 0xe1, 0xed, 0x66, 0xf6, 0x37, 0x33, 0x3b, 0xbf, 0x9d, 0xfd, 0xdd, 0xc2, 0x16, - 0x0f, 0x3d, 0xfc, 0x79, 0x32, 0x8d, 0xa2, 0xc4, 0x1b, 0xc4, 0x49, 0x24, 0x23, 0x42, 0x02, 0xee, - 0xbf, 0x9a, 0x89, 0xd4, 0x1a, 0xe8, 0xf5, 0xee, 0xc6, 0x34, 0x0a, 0x82, 0x28, 0x4c, 0x7d, 0xdd, - 0x4d, 0x1e, 0x4a, 0x4c, 0x42, 0xe6, 0x1b, 0x7b, 0xa3, 0x18, 0xe1, 0xfe, 0x66, 0xc1, 0xfb, 0x14, - 0x4f, 0xb9, 0x90, 0x98, 0x3c, 0x8f, 0x3c, 0xa4, 0x78, 0x3e, 0x43, 0x21, 0xc9, 0x13, 0x58, 0x3d, - 0x66, 0x02, 0x1d, 0xab, 0x67, 0xf5, 0x3b, 0xc3, 0x8f, 0x06, 0xa5, 0x32, 0x26, 0xff, 0x81, 0x38, - 0xdd, 0x65, 0x02, 0xa9, 0x46, 0x92, 0x2f, 0xa1, 0xc5, 0x3c, 0x2f, 0x41, 0x21, 0x9c, 0xc6, 0x15, - 0x41, 0xdf, 0xa6, 0x18, 0x9a, 0x81, 0xc9, 0x1d, 0x68, 0x86, 0x91, 0x87, 0xe3, 0x91, 0x63, 0xf7, - 0xac, 0xbe, 0x4d, 0x8d, 0xe5, 0xfe, 0x6a, 0xc1, 0xed, 0xf2, 0xce, 0x44, 0x1c, 0x85, 0x02, 0xc9, - 0x53, 0x68, 0x0a, 0xc9, 0xe4, 0x4c, 0x98, 0xcd, 0xdd, 0xad, 0xad, 0xf3, 0x42, 0x43, 0xa8, 0x81, - 0x92, 0x5d, 0xe8, 0xf0, 0x90, 0xcb, 0x49, 0xcc, 0x12, 0x16, 0x64, 0x3b, 0xdc, 0x1e, 0x54, 0xd8, - 0x33, 0x44, 0x8d, 0x43, 0x2e, 0x0f, 0x35, 0x90, 0x02, 0xcf, 0xbf, 0xdd, 0xaf, 0xe1, 0x83, 0x7d, - 0x94, 0x63, 0xc5, 0xb1, 0xca, 0x8e, 0x22, 0x23, 0xeb, 0x01, 0xbc, 0xa7, 0x99, 0xdf, 0x9d, 0x71, - 0xdf, 0x1b, 0x8f, 0xd4, 0xc6, 0xec, 0xbe, 0x4d, 0xcb, 0x4e, 0xf7, 0x0f, 0x0b, 0xda, 0x3a, 0x78, - 0x1c, 0x9e, 0x44, 0xe4, 0x19, 0xac, 0xa9, 0xad, 0xa5, 0x0c, 0x6f, 0x0e, 0xef, 0xd7, 0x36, 0x71, - 0x59, 0x8b, 0xa6, 0x68, 0xe2, 0xc2, 0x46, 0x31, 0xab, 0x6e, 0xc4, 0xa6, 0x25, 0x1f, 0x71, 0xa0, - 0xa5, 0xed, 0x9c, 0xd2, 0xcc, 0x24, 0xf7, 0x00, 0xd2, 0x11, 0x0a, 0x59, 0x80, 0xce, 0x6a, 0xcf, - 0xea, 0xb7, 0x69, 0x5b, 0x7b, 0x9e, 0xb3, 0x00, 0xd5, 0x51, 0x24, 0xc8, 0x44, 0x14, 0x3a, 0x6b, - 0x7a, 0xc9, 0x58, 0xee, 0x2f, 0x16, 0xdc, 0xa9, 0x76, 0x7e, 0x93, 0xc3, 0x78, 0x96, 0x06, 0xa1, - 0x3a, 0x07, 0xbb, 0xdf, 0x19, 0xde, 0x1b, 0xcc, 0x4f, 0xf1, 0x20, 0xa7, 0x8a, 0x1a, 0xb0, 0xfb, - 0xa6, 0x01, 0x64, 0x2f, 0x41, 0x26, 0x51, 0xaf, 0x65, 0xec, 0x57, 0x29, 0xb1, 0x6a, 0x28, 0x29, - 0x37, 0xde, 0xa8, 0x36, 0xbe, 0x98, 0x31, 0x07, 0x5a, 0xaf, 0x30, 0x11, 0x3c, 0x0a, 0x35, 0x5d, - 0x36, 0xcd, 0x4c, 0x72, 0x17, 0xda, 0x01, 0x4a, 0x36, 0x89, 0x99, 0x3c, 0x33, 0x7c, 0xad, 0x2b, - 0xc7, 0x21, 0x93, 0x67, 0xaa, 0x9e, 0xc7, 0xcc, 0xa2, 0x70, 0x9a, 0x3d, 0x5b, 0xd5, 0x53, 0x1e, - 0xb5, 0xaa, 0xa7, 0x51, 0x5e, 0xc4, 0x98, 0x4d, 0x63, 0x4b, 0xb3, 0xb0, 0x5d, 0x4b, 0xdd, 0xf7, - 0x78, 0xf1, 0x03, 0xf3, 0x67, 0x78, 0xc8, 0x78, 0x42, 0x41, 0x45, 0xa5, 0xd3, 0x48, 0x46, 0xa6, - 0xed, 0x2c, 0xc9, 0xfa, 0xb2, 0x49, 0x3a, 0x3a, 0xcc, 0xcc, 0xf4, 0xef, 0x0d, 0xd8, 0x4a, 0x49, - 0xfa, 0xd7, 0x28, 0x2d, 0x73, 0xb3, 0xf6, 0x16, 0x6e, 0x9a, 0xff, 0x04, 0x37, 0xad, 0xbf, 0xc5, - 0x4d, 0x00, 0xa4, 0x48, 0xcd, 0x4d, 0x26, 0x7e, 0x89, 0x6b, 0xeb, 0x7e, 0x03, 0x4e, 0x76, 0xc9, - 0xbe, 0xe3, 0x3e, 0x6a, 0x36, 0xae, 0xa7, 0x30, 0xaf, 0x2d, 0xd8, 0x2a, 0xc5, 0x6b, 0xa5, 0x79, - 0x57, 0x1b, 0x26, 0x7d, 0xb8, 0x95, 0xb2, 0x7c, 0xc2, 0x7d, 0x34, 0xc7, 0x69, 0xeb, 0xe3, 0xdc, - 0xe4, 0xa5, 0x2e, 0xd4, 0xc6, 0x3e, 0xac, 0xe9, 0xed, 0x26, 0x8c, 0x8e, 0x00, 0x0a, 0x65, 0x53, - 0x1d, 0xf9, 0x64, 0xa1, 0x8e, 0x14, 0x09, 0xa1, 0xed, 0x93, 0x7c, 0x63, 0x7f, 0x36, 0x8c, 0x26, - 0x1f, 0xa0, 0x64, 0x4b, 0x8d, 0x7d, 0xae, 0xdb, 0x8d, 0x6b, 0xe9, 0xf6, 0x7d, 0xe8, 0x9c, 0x30, - 0xee, 0x4f, 0x8c, 0xbe, 0xda, 0xfa, 0xba, 0x80, 0x72, 0x51, 0xed, 0x21, 0x5f, 0x81, 0x9d, 0xe0, - 0xb9, 0x16, 0x99, 0x05, 0x8d, 0xcc, 0x5d, 0x53, 0xaa, 0x22, 0x6a, 0x4f, 0x61, 0xad, 0xee, 0x14, - 0xc8, 0x36, 0x6c, 0x04, 0x2c, 0x79, 0x39, 0xf1, 0xd0, 0x47, 0x89, 0x9e, 0xd3, 0xec, 0x59, 0xfd, - 0x75, 0xda, 0x51, 0xbe, 0x51, 0xea, 0x2a, 0xfc, 0x8c, 0x5b, 0xc5, 0x9f, 0x71, 0x51, 0x06, 0xd7, - 0xcb, 0x32, 0xd8, 0x85, 0xf5, 0x04, 0xa7, 0x17, 0x53, 0x1f, 0x3d, 0xa7, 0xad, 0x13, 0xe6, 0xb6, - 0xfb, 0x08, 0x6e, 0x8d, 0x92, 0x28, 0x2e, 0x49, 0x4b, 0x41, 0x17, 0xac, 0x92, 0x2e, 0x0c, 0xdf, - 0x34, 0x01, 0x34, 0x74, 0x4f, 0xbd, 0x6f, 0x48, 0x0c, 0x64, 0x1f, 0xe5, 0x5e, 0x14, 0xc4, 0x51, - 0x88, 0xa1, 0x4c, 0xff, 0x3b, 0xe4, 0xc9, 0x82, 0x5f, 0xf6, 0x3c, 0xd4, 0x14, 0xec, 0x3e, 0x5c, - 0x10, 0x51, 0x81, 0xbb, 0x2b, 0x24, 0xd0, 0x15, 0x8f, 0x78, 0x80, 0x47, 0x7c, 0xfa, 0x72, 0xef, - 0x8c, 0x85, 0x21, 0xfa, 0x57, 0x55, 0xac, 0x40, 0xb3, 0x8a, 0x1f, 0x97, 0x23, 0x8c, 0xf1, 0x42, - 0x26, 0x3c, 0x3c, 0xcd, 0x86, 0xde, 0x5d, 0x21, 0xe7, 0x70, 0x7b, 0x1f, 0x75, 0x75, 0x2e, 0x24, - 0x9f, 0x8a, 0xac, 0xe0, 0x70, 0x71, 0xc1, 0x39, 0xf0, 0x35, 0x4b, 0xfe, 0x04, 0x70, 0x39, 0x45, - 0x64, 0xb9, 0x29, 0x9b, 0x27, 0xb0, 0x0a, 0xcb, 0xd3, 0x73, 0xd8, 0x2c, 0x3f, 0x13, 0xc8, 0x67, - 0x75, 0xb1, 0xb5, 0x8f, 0xa8, 0xee, 0xe7, 0xcb, 0x40, 0xf3, 0x52, 0x09, 0x6c, 0xcd, 0x09, 0x0a, - 0x79, 0x74, 0x55, 0x8a, 0xaa, 0xa6, 0x76, 0x1f, 0x2f, 0x89, 0xce, 0x6b, 0x1e, 0x42, 0x3b, 0x1f, - 0x67, 0xf2, 0xa0, 0x2e, 0xba, 0x3a, 0xed, 0xdd, 0xab, 0xa4, 0xcc, 0x5d, 0x21, 0x13, 0x80, 0x7d, - 0x94, 0x07, 0x28, 0x13, 0x3e, 0x15, 0xe4, 0x61, 0xed, 0x21, 0x5e, 0x02, 0xb2, 0xa4, 0x9f, 0xbe, - 0x15, 0x97, 0x6d, 0x79, 0xf8, 0x7a, 0xd5, 0xe8, 0x9b, 0x7a, 0x41, 0xff, 0x7f, 0xa5, 0xde, 0xc1, - 0x95, 0x3a, 0x82, 0x4e, 0xe1, 0x4d, 0x4a, 0x6a, 0x2f, 0xcb, 0xfc, 0xa3, 0xf5, 0xbf, 0x1e, 0x8c, - 0xdd, 0x2f, 0x7e, 0x1c, 0x9e, 0x72, 0x79, 0x36, 0x3b, 0x56, 0xa5, 0x77, 0x52, 0xe4, 0x63, 0x1e, - 0x99, 0xaf, 0x9d, 0x8c, 0xa1, 0x1d, 0x9d, 0x69, 0x47, 0xb7, 0x11, 0x1f, 0x1f, 0x37, 0xb5, 0xf9, - 0xf4, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x75, 0x9d, 0x20, 0xf1, 0x89, 0x0e, 0x00, 0x00, + // 1057 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0xcd, 0x6e, 0xdb, 0x46, + 0x10, 0x36, 0x4d, 0x5b, 0x3f, 0x23, 0xc5, 0x88, 0xb7, 0x69, 0xc0, 0x28, 0x0d, 0x22, 0xb3, 0x49, + 0xaa, 0x16, 0x89, 0x1c, 0x28, 0x4d, 0x7b, 0x2a, 0xd0, 0xda, 0x42, 0x0c, 0xa3, 0x70, 0x60, 0xac, + 0x8d, 0x1e, 0x0a, 0x14, 0xc2, 0x5a, 0x1c, 0xd9, 0x8b, 0xf0, 0x47, 0xe6, 0xae, 0x92, 0xda, 0xc7, + 0xa2, 0xf7, 0xde, 0xd2, 0x47, 0xe9, 0x73, 0xe4, 0x71, 0x7a, 0x2b, 0x76, 0xb9, 0xa4, 0x49, 0x89, + 0x72, 0xe4, 0xba, 0x69, 0x2f, 0xbd, 0x71, 0x86, 0xdf, 0xec, 0xec, 0x7c, 0x3b, 0xf3, 0xed, 0xc2, + 0x3a, 0x0f, 0x3d, 0xfc, 0x79, 0x30, 0x8c, 0xa2, 0xd8, 0xeb, 0x8e, 0xe3, 0x48, 0x46, 0x84, 0x04, + 0xdc, 0x7f, 0x3d, 0x11, 0x89, 0xd5, 0xd5, 0xff, 0x5b, 0xcd, 0x61, 0x14, 0x04, 0x51, 0x98, 0xf8, + 0x5a, 0x6b, 0x3c, 0x94, 0x18, 0x87, 0xcc, 0x37, 0x76, 0x33, 0x1f, 0xd1, 0x6a, 0x8a, 0xe1, 0x09, + 0x06, 0x2c, 0xb1, 0xdc, 0xdf, 0x2d, 0xf8, 0x88, 0xe2, 0x31, 0x17, 0x12, 0xe3, 0x97, 0x91, 0x87, + 0x14, 0x4f, 0x27, 0x28, 0x24, 0x79, 0x0a, 0x2b, 0x47, 0x4c, 0xa0, 0x63, 0xb5, 0xad, 0x4e, 0xa3, + 0xf7, 0x49, 0xb7, 0x90, 0xd4, 0x64, 0xdb, 0x13, 0xc7, 0x5b, 0x4c, 0x20, 0xd5, 0x48, 0xf2, 0x15, + 0x54, 0x99, 0xe7, 0xc5, 0x28, 0x84, 0xb3, 0x7c, 0x49, 0xd0, 0x77, 0x09, 0x86, 0xa6, 0x60, 0x72, + 0x1b, 0x2a, 0x61, 0xe4, 0xe1, 0x6e, 0xdf, 0xb1, 0xdb, 0x56, 0xc7, 0xa6, 0xc6, 0x72, 0x7f, 0xb3, + 0xe0, 0x56, 0x71, 0x67, 0x62, 0x1c, 0x85, 0x02, 0xc9, 0x33, 0xa8, 0x08, 0xc9, 0xe4, 0x44, 0x98, + 0xcd, 0xdd, 0x2d, 0xcd, 0x73, 0xa0, 0x21, 0xd4, 0x40, 0xc9, 0x16, 0x34, 0x78, 0xc8, 0xe5, 0x60, + 0xcc, 0x62, 0x16, 0xa4, 0x3b, 0xdc, 0xe8, 0x4e, 0x71, 0x69, 0x68, 0xdb, 0x0d, 0xb9, 0xdc, 0xd7, + 0x40, 0x0a, 0x3c, 0xfb, 0x76, 0xbf, 0x81, 0x8f, 0x77, 0x50, 0xee, 0x2a, 0xc6, 0xd5, 0xea, 0x28, + 0x52, 0xb2, 0x1e, 0xc0, 0x0d, 0x7d, 0x0e, 0x5b, 0x13, 0xee, 0x7b, 0xbb, 0x7d, 0xb5, 0x31, 0xbb, + 0x63, 0xd3, 0xa2, 0xd3, 0xfd, 0xc3, 0x82, 0xba, 0x0e, 0xde, 0x0d, 0x47, 0x11, 0x79, 0x0e, 0xab, + 0x6a, 0x6b, 0x09, 0xc3, 0x6b, 0xbd, 0xfb, 0xa5, 0x45, 0x5c, 0xe4, 0xa2, 0x09, 0x9a, 0xb8, 0xd0, + 0xcc, 0xaf, 0xaa, 0x0b, 0xb1, 0x69, 0xc1, 0x47, 0x1c, 0xa8, 0x6a, 0x3b, 0xa3, 0x34, 0x35, 0xc9, + 0x3d, 0x80, 0xa4, 0xa1, 0x42, 0x16, 0xa0, 0xb3, 0xd2, 0xb6, 0x3a, 0x75, 0x5a, 0xd7, 0x9e, 0x97, + 0x2c, 0x40, 0x75, 0x14, 0x31, 0x32, 0x11, 0x85, 0xce, 0xaa, 0xfe, 0x65, 0x2c, 0xf7, 0x57, 0x0b, + 0x6e, 0x4f, 0x57, 0x7e, 0x9d, 0xc3, 0x78, 0x9e, 0x04, 0xa1, 0x3a, 0x07, 0xbb, 0xd3, 0xe8, 0xdd, + 0xeb, 0xce, 0xf6, 0x74, 0x37, 0xa3, 0x8a, 0x1a, 0xb0, 0xfb, 0x6e, 0x19, 0xc8, 0x76, 0x8c, 0x4c, + 0xa2, 0xfe, 0x97, 0xb2, 0x3f, 0x4d, 0x89, 0x55, 0x42, 0x49, 0xb1, 0xf0, 0xe5, 0xe9, 0xc2, 0xe7, + 0x33, 0xe6, 0x40, 0xf5, 0x35, 0xc6, 0x82, 0x47, 0xa1, 0xa6, 0xcb, 0xa6, 0xa9, 0x49, 0xee, 0x42, + 0x3d, 0x40, 0xc9, 0x06, 0x63, 0x26, 0x4f, 0x0c, 0x5f, 0x35, 0xe5, 0xd8, 0x67, 0xf2, 0x44, 0xe5, + 0xf3, 0x98, 0xf9, 0x29, 0x9c, 0x4a, 0xdb, 0x56, 0xf9, 0x94, 0x47, 0xfd, 0xd5, 0xdd, 0x28, 0xcf, + 0xc6, 0x98, 0x76, 0x63, 0x55, 0xb3, 0xb0, 0x51, 0x4a, 0xdd, 0xf7, 0x78, 0xf6, 0x03, 0xf3, 0x27, + 0xb8, 0xcf, 0x78, 0x4c, 0x41, 0x45, 0x25, 0xdd, 0x48, 0xfa, 0xa6, 0xec, 0x74, 0x91, 0xda, 0xa2, + 0x8b, 0x34, 0x74, 0x98, 0xe9, 0xe9, 0x3f, 0x97, 0x61, 0x3d, 0x21, 0xe9, 0x5f, 0xa3, 0xb4, 0xc8, + 0xcd, 0xea, 0x7b, 0xb8, 0xa9, 0xfc, 0x13, 0xdc, 0x54, 0xff, 0x0e, 0x37, 0xe4, 0x0e, 0xd4, 0xc2, + 0x49, 0x30, 0x88, 0xa3, 0x37, 0x8a, 0x5d, 0x5d, 0x43, 0x38, 0x09, 0x68, 0xf4, 0x46, 0x90, 0x6d, + 0x68, 0x8e, 0x38, 0xfa, 0xde, 0x20, 0x11, 0x53, 0xa7, 0xae, 0x9b, 0xbf, 0x5d, 0x4c, 0x60, 0x84, + 0xf6, 0x85, 0x02, 0x1e, 0xe8, 0x6f, 0xda, 0x18, 0x5d, 0x18, 0x6e, 0x00, 0x24, 0x4f, 0xfd, 0x75, + 0x26, 0x6a, 0x01, 0x59, 0x70, 0xbf, 0x05, 0x27, 0x1d, 0xe2, 0x17, 0xdc, 0x47, 0xcd, 0xf6, 0xd5, + 0x14, 0xec, 0xad, 0x05, 0xeb, 0x85, 0x78, 0xad, 0x64, 0x1f, 0x6a, 0xc3, 0xa4, 0x03, 0x37, 0x93, + 0x53, 0x1c, 0x71, 0x1f, 0x4d, 0xbb, 0xd8, 0xba, 0x5d, 0xd6, 0x78, 0xa1, 0x0a, 0xb5, 0xb1, 0x3b, + 0x25, 0xb5, 0x5d, 0x87, 0xd1, 0x3e, 0x40, 0x2e, 0x6d, 0xa2, 0x53, 0x0f, 0xe7, 0xea, 0x54, 0x9e, + 0x10, 0x5a, 0x1f, 0x65, 0x1b, 0xfb, 0xc5, 0x36, 0x9a, 0xbf, 0x87, 0x92, 0x2d, 0x34, 0x56, 0xd9, + 0xbd, 0xb0, 0x7c, 0xa5, 0x7b, 0xe1, 0x3e, 0x34, 0x46, 0x8c, 0xfb, 0x03, 0xa3, 0xdf, 0xb6, 0x1e, + 0x47, 0x50, 0x2e, 0xaa, 0x3d, 0xe4, 0x6b, 0xb0, 0x63, 0x3c, 0xd5, 0x22, 0x36, 0xa7, 0x90, 0x19, + 0x19, 0xa0, 0x2a, 0xa2, 0xf4, 0x14, 0x56, 0xcb, 0x4e, 0x81, 0x6c, 0x40, 0x33, 0x60, 0xf1, 0xab, + 0x81, 0x87, 0x3e, 0x4a, 0xf4, 0x9c, 0x4a, 0xdb, 0xea, 0xd4, 0x68, 0x43, 0xf9, 0xfa, 0x89, 0x2b, + 0x77, 0xd9, 0x57, 0xf3, 0x97, 0x7d, 0x5e, 0x66, 0x6b, 0x45, 0x99, 0x6d, 0x41, 0x2d, 0xc6, 0xe1, + 0xd9, 0xd0, 0x47, 0x4f, 0x4f, 0x59, 0x8d, 0x66, 0x36, 0x79, 0x08, 0x6b, 0x02, 0x63, 0xce, 0x7c, + 0x7e, 0x8e, 0x03, 0xc1, 0xcf, 0xd1, 0x81, 0xb6, 0xd5, 0x59, 0xa1, 0x37, 0x32, 0xef, 0x01, 0x3f, + 0x47, 0xf7, 0x31, 0xdc, 0xec, 0xc7, 0xd1, 0xb8, 0xa0, 0x70, 0x39, 0x79, 0xb2, 0x0a, 0xf2, 0xd4, + 0x7b, 0x57, 0x01, 0xd0, 0xd0, 0x6d, 0xf5, 0xe8, 0x22, 0x63, 0x20, 0x3b, 0x28, 0xb7, 0xa3, 0x60, + 0x1c, 0x85, 0x18, 0xca, 0xe4, 0xfa, 0x23, 0x4f, 0xe7, 0xbc, 0x1c, 0x66, 0xa1, 0x26, 0x61, 0xeb, + 0xd1, 0x9c, 0x88, 0x29, 0xb8, 0xbb, 0x44, 0x02, 0x9d, 0xf1, 0x90, 0x07, 0x78, 0xc8, 0x87, 0xaf, + 0xb6, 0x4f, 0x58, 0x18, 0xa2, 0x7f, 0x59, 0xc6, 0x29, 0x68, 0x9a, 0xf1, 0xd3, 0x62, 0x84, 0x31, + 0x0e, 0x64, 0xcc, 0xc3, 0xe3, 0x74, 0x36, 0xdc, 0x25, 0x72, 0x0a, 0xb7, 0x76, 0x50, 0x67, 0xe7, + 0x42, 0xf2, 0xa1, 0x48, 0x13, 0xf6, 0xe6, 0x27, 0x9c, 0x01, 0x5f, 0x31, 0xe5, 0x4f, 0x00, 0x17, + 0xcd, 0x46, 0x16, 0x6b, 0xc6, 0x59, 0x02, 0xa7, 0x61, 0xd9, 0xf2, 0x1c, 0xd6, 0x8a, 0xaf, 0x15, + 0xf2, 0x79, 0x59, 0x6c, 0xe9, 0x5b, 0xae, 0xf5, 0xc5, 0x22, 0xd0, 0x2c, 0x55, 0x0c, 0xeb, 0x33, + 0xba, 0x43, 0x1e, 0x5f, 0xb6, 0xc4, 0xb4, 0xf4, 0xb6, 0x9e, 0x2c, 0x88, 0xce, 0x72, 0xee, 0x43, + 0x3d, 0x6b, 0x67, 0xf2, 0xa0, 0x2c, 0x7a, 0xba, 0xdb, 0x5b, 0x97, 0x29, 0x9e, 0xbb, 0x44, 0x06, + 0x00, 0x3b, 0x28, 0xf7, 0x50, 0xc6, 0x7c, 0x28, 0xc8, 0xa3, 0xd2, 0x43, 0xbc, 0x00, 0xa4, 0x8b, + 0x7e, 0xf6, 0x5e, 0x5c, 0xba, 0xe5, 0xde, 0xdb, 0x15, 0x23, 0x83, 0xea, 0x21, 0xff, 0xff, 0x48, + 0x7d, 0x80, 0x91, 0x3a, 0x84, 0x46, 0xee, 0x69, 0x4c, 0x4a, 0x87, 0x65, 0xf6, 0xed, 0xfc, 0x5f, + 0x37, 0xc6, 0xd6, 0x97, 0x3f, 0xf6, 0x8e, 0xb9, 0x3c, 0x99, 0x1c, 0xa9, 0xd4, 0x9b, 0x09, 0xf2, + 0x09, 0x8f, 0xcc, 0xd7, 0x66, 0xca, 0xd0, 0xa6, 0x5e, 0x69, 0x53, 0x97, 0x31, 0x3e, 0x3a, 0xaa, + 0x68, 0xf3, 0xd9, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x1b, 0x2b, 0xc8, 0x1a, 0x1e, 0x0f, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 3fdaf262c4..ee7e498126 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -118,7 +118,7 @@ type Core struct { CallGetFlushedSegmentsService func(ctx context.Context, collID, partID typeutil.UniqueID) ([]typeutil.UniqueID, error) //call index builder's client to build index, return build id - CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) + CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error) CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error NewProxyClient func(sess *sessionutil.Session) (types.Proxy, error) @@ -727,7 +727,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error { } }() - c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (retID typeutil.UniqueID, retErr error) { + c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (retID typeutil.UniqueID, retErr error) { defer func() { if err := recover(); err != nil { retErr = fmt.Errorf("build index panic, msg = %v", err) @@ -740,6 +740,8 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error { IndexParams: idxInfo.IndexParams, IndexID: idxInfo.IndexID, IndexName: idxInfo.IndexName, + NumRows: numRows, + FieldSchema: field, }) if err != nil { return retID, err @@ -864,7 +866,7 @@ func (c *Core) BuildIndex(ctx context.Context, segID typeutil.UniqueID, field *s if err != nil { return 0, err } - bldID, err = c.CallBuildIndexService(ctx, binlogs, field, idxInfo) + bldID, err = c.CallBuildIndexService(ctx, binlogs, field, idxInfo, rows) if err != nil { return 0, err } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index e85d76a893..ccc491f1f6 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -2418,7 +2418,7 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { + c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error) { return 0, nil } err = c.checkInit() @@ -2595,7 +2595,7 @@ func TestCheckFlushedSegments(t *testing.T) { core.MetaTable.indexID2Meta[indexID] = etcdpb.IndexInfo{ IndexID: indexID, } - core.CallBuildIndexService = func(_ context.Context, binlog []string, field *schemapb.FieldSchema, idx *etcdpb.IndexInfo) (int64, error) { + core.CallBuildIndexService = func(_ context.Context, binlog []string, field *schemapb.FieldSchema, idx *etcdpb.IndexInfo, numRows int64) (int64, error) { assert.Equal(t, fieldID, field.FieldID) assert.Equal(t, indexID, idx.IndexID) return -1, errors.New("build index build") @@ -2604,7 +2604,7 @@ func TestCheckFlushedSegments(t *testing.T) { core.checkFlushedSegments(ctx) var indexBuildID int64 = 10001 - core.CallBuildIndexService = func(_ context.Context, binlog []string, field *schemapb.FieldSchema, idx *etcdpb.IndexInfo) (int64, error) { + core.CallBuildIndexService = func(_ context.Context, binlog []string, field *schemapb.FieldSchema, idx *etcdpb.IndexInfo, numRows int64) (int64, error) { return indexBuildID, nil } core.checkFlushedSegments(core.ctx)