From f5198bcaeeafea4686c72682568c6243fafb7adc Mon Sep 17 00:00:00 2001 From: xige-16 Date: Thu, 24 Dec 2020 14:51:55 +0800 Subject: [PATCH] Add LoadIndexService in query node Signed-off-by: xige-16 --- build/docker/deploy/.env | 2 +- configs/advanced/channel.yaml | 2 + configs/advanced/query_node.yaml | 4 + internal/core/src/common/LoadIndex.h | 23 ++ internal/core/src/segcore/CMakeLists.txt | 3 +- internal/core/src/segcore/load_index_c.cpp | 139 +++++++++ internal/core/src/segcore/load_index_c.h | 45 +++ internal/core/src/segcore/segment_c.cpp | 17 ++ internal/core/src/segcore/segment_c.h | 3 + internal/core/unittest/test_c_api.cpp | 62 +++- internal/master/grpc_service.go | 35 +-- internal/master/index_task.go | 95 ------ internal/master/master.go | 2 +- internal/master/meta_table.go | 45 +-- internal/master/meta_table_test.go | 2 +- internal/master/segment_manager.go | 14 - internal/msgstream/unmarshal.go | 2 + internal/proto/internal_msg.proto | 6 +- internal/proto/internalpb/internal_msg.pb.go | 254 ++++++++-------- internal/querynode/client/client.go | 3 +- internal/querynode/collection_replica.go | 11 +- internal/querynode/load_index.go | 41 --- internal/querynode/load_index_info.go | 98 ++++++ internal/querynode/load_index_service.go | 286 ++++++++++++++++++ internal/querynode/load_index_service_test.go | 148 +++++++++ internal/querynode/param_table.go | 77 +++++ internal/querynode/param_table_test.go | 41 +++ internal/querynode/query_node.go | 14 +- internal/querynode/query_node_test.go | 5 +- internal/querynode/reduce_test.go | 2 +- internal/querynode/segment.go | 26 +- internal/querynode/segment_test.go | 20 +- internal/querynode/stats_service.go | 41 ++- internal/querynode/stats_service_test.go | 6 +- 34 files changed, 1172 insertions(+), 402 deletions(-) create mode 100644 internal/core/src/common/LoadIndex.h create mode 100644 internal/core/src/segcore/load_index_c.cpp create mode 100644 internal/core/src/segcore/load_index_c.h delete mode 100644 internal/master/index_task.go delete mode 100644 internal/querynode/load_index.go create mode 100644 internal/querynode/load_index_info.go create mode 100644 internal/querynode/load_index_service.go create mode 100644 internal/querynode/load_index_service_test.go diff --git a/build/docker/deploy/.env b/build/docker/deploy/.env index 01c909d01f..4a3735af88 100644 --- a/build/docker/deploy/.env +++ b/build/docker/deploy/.env @@ -5,4 +5,4 @@ TARGET_TAG=latest PULSAR_ADDRESS=pulsar://pulsar:6650 ETCD_ADDRESS=etcd:2379 MASTER_ADDRESS=master:53100 -MINIO_ADDRESS=minio:9000 \ No newline at end of file +MINIO_ADDRESS=minio:9000 diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index 8116601e97..135c2eb3e8 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -22,6 +22,8 @@ msgChannel: writeNodeTimeTick: "writeNodeTimeTick" # old name: statsChannels: "statistic" queryNodeStats: "query-node-stats" + # cmd for loadIndex, flush, etc... + cmd: "cmd" # sub name generation rule: ${subNamePrefix}-${NodeID} subNamePrefix: diff --git a/configs/advanced/query_node.yaml b/configs/advanced/query_node.yaml index 862da2cbdd..ec5e6603b7 100644 --- a/configs/advanced/query_node.yaml +++ b/configs/advanced/query_node.yaml @@ -42,3 +42,7 @@ queryNode: stats: recvBufSize: 64 + + loadIndex: + recvBufSize: 512 + pulsarBufSize: 512 diff --git a/internal/core/src/common/LoadIndex.h b/internal/core/src/common/LoadIndex.h new file mode 100644 index 0000000000..377b533082 --- /dev/null +++ b/internal/core/src/common/LoadIndex.h @@ -0,0 +1,23 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once +#include +#include + +#include "../index/knowhere/knowhere/index/vector_index/VecIndex.h" + +struct LoadIndexInfo { + std::string field_name; + int64_t field_id; + std::map index_params; + milvus::knowhere::VecIndexPtr index; +}; diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index 2dfcf78e79..02eb732ad7 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -11,7 +11,8 @@ set(SEGCORE_FILES InsertRecord.cpp Reduce.cpp plan_c.cpp - reduce_c.cpp) + reduce_c.cpp + load_index_c.cpp) add_library(milvus_segcore SHARED ${SEGCORE_FILES} ) diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp new file mode 100644 index 0000000000..01c9789e5d --- /dev/null +++ b/internal/core/src/segcore/load_index_c.cpp @@ -0,0 +1,139 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "index/knowhere/knowhere/common/BinarySet.h" +#include "index/knowhere/knowhere/index/vector_index/VecIndexFactory.h" +#include "segcore/load_index_c.h" +#include "common/LoadIndex.h" +#include "utils/EasyAssert.h" + +CStatus +NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info) { + try { + auto load_index_info = std::make_unique(); + *c_load_index_info = load_index_info.release(); + auto status = CStatus(); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (std::exception& e) { + auto status = CStatus(); + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + return status; + } +} + +CStatus +AppendIndexParam(CLoadIndexInfo c_load_index_info, const char* c_index_key, const char* c_index_value) { + try { + auto load_index_info = (LoadIndexInfo*)c_load_index_info; + std::string index_key(c_index_key); + std::string index_value(c_index_value); + load_index_info->index_params[index_key] = index_value; + + auto status = CStatus(); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (std::exception& e) { + auto status = CStatus(); + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + return status; + } +} + +CStatus +AppendFieldInfo(CLoadIndexInfo c_load_index_info, const char* c_field_name, int64_t field_id) { + try { + auto load_index_info = (LoadIndexInfo*)c_load_index_info; + std::string field_name(c_field_name); + load_index_info->field_name = field_name; + load_index_info->field_id = field_id; + + auto status = CStatus(); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (std::exception& e) { + auto status = CStatus(); + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + return status; + } +} + +CStatus +AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { + try { + auto load_index_info = (LoadIndexInfo*)c_load_index_info; + auto binary_set = (milvus::knowhere::BinarySet*)c_binary_set; + auto& index_params = load_index_info->index_params; + bool find_index_type = index_params.count("index_type") > 0 ? true : false; + bool find_index_mode = index_params.count("index_mode") > 0 ? true : false; + Assert(find_index_mode == true); + Assert(find_index_type == true); + auto mode = index_params["index_mode"] == "CPU" ? milvus::knowhere::IndexMode::MODE_CPU + : milvus::knowhere::IndexMode::MODE_GPU; + load_index_info->index = + milvus::knowhere::VecIndexFactory::GetInstance().CreateVecIndex(index_params["index_type"], mode); + load_index_info->index->Load(*binary_set); + + auto status = CStatus(); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (std::exception& e) { + auto status = CStatus(); + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + return status; + } +} + +CStatus +NewBinarySet(CBinarySet* c_binary_set) { + try { + auto binary_set = std::make_unique(); + *c_binary_set = binary_set.release(); + auto status = CStatus(); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (std::exception& e) { + auto status = CStatus(); + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + return status; + } +} + +CStatus +AppendBinaryIndex(CBinarySet c_binary_set, void* index_binary, int64_t index_size, const char* c_index_key) { + try { + auto binary_set = (milvus::knowhere::BinarySet*)c_binary_set; + std::string index_key(c_index_key); + uint8_t* index = (uint8_t*)index_binary; + std::shared_ptr data(index); + binary_set->Append(index_key, data, index_size); + + auto status = CStatus(); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (std::exception& e) { + auto status = CStatus(); + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + return status; + } +} diff --git a/internal/core/src/segcore/load_index_c.h b/internal/core/src/segcore/load_index_c.h new file mode 100644 index 0000000000..88985a1e81 --- /dev/null +++ b/internal/core/src/segcore/load_index_c.h @@ -0,0 +1,45 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include + +#include "segcore/collection_c.h" + +typedef void* CLoadIndexInfo; +typedef void* CBinarySet; + +CStatus +NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info); + +CStatus +AppendIndexParam(CLoadIndexInfo c_load_index_info, const char* index_key, const char* index_value); + +CStatus +AppendFieldInfo(CLoadIndexInfo c_load_index_info, const char* field_name, int64_t field_id); + +CStatus +AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set); + +CStatus +NewBinarySet(CBinarySet* c_binary_set); + +CStatus +AppendBinaryIndex(CBinarySet c_binary_set, void* index_binary, int64_t index_size, const char* c_index_key); + +#ifdef __cplusplus +} +#endif diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index e151f3b68b..a9e2d5d521 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -19,6 +19,7 @@ #include #include #include +#include "common/LoadIndex.h" CSegmentBase NewSegment(CCollection collection, uint64_t segment_id) { @@ -173,6 +174,22 @@ FillTargetEntry(CSegmentBase c_segment, CPlan c_plan, CQueryResult c_result) { return status; } +CStatus +UpdateSegmentIndex(CSegmentBase c_segment, CLoadIndexInfo c_load_index_info) { + auto load_index_info = (LoadIndexInfo*)c_load_index_info; + try { + auto status = CStatus(); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (std::exception& e) { + auto status = CStatus(); + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + return status; + } +} + ////////////////////////////////////////////////////////////////// int diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index 0dc3f7cdcd..6bec566dd3 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -18,6 +18,7 @@ extern "C" { #include #include "segcore/plan_c.h" +#include "segcore/load_index_c.h" typedef void* CSegmentBase; typedef void* CQueryResult; @@ -64,6 +65,8 @@ Search(CSegmentBase c_segment, CStatus FillTargetEntry(CSegmentBase c_segment, CPlan c_plan, CQueryResult result); +CStatus +UpdateSegmentIndex(CSegmentBase c_segment, CLoadIndexInfo c_load_index_info); ////////////////////////////////////////////////////////////////// int diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 63f510aa61..2aaff331d8 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -13,11 +13,17 @@ #include #include #include +#include #include "pb/service_msg.pb.h" #include "segcore/reduce_c.h" -#include +#include +#include +#include +#include +#include + namespace chrono = std::chrono; TEST(CApiTest, CollectionTest) { @@ -338,7 +344,7 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) { namespace { auto generate_data(int N) { - std::vector raw_data; + std::vector raw_data; std::vector timestamps; std::vector uids; std::default_random_engine er(42); @@ -352,7 +358,7 @@ generate_data(int N) { for (auto& x : vec) { x = distribution(er); } - raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec)); + raw_data.insert(raw_data.end(), std::begin(vec), std::end(vec)); int age = ei() % 100; raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age)); } @@ -678,3 +684,53 @@ TEST(CApiTest, Reduce) { DeleteCollection(collection); DeleteSegment(segment); } + +TEST(CApiTest, LoadIndex_Search) { + // generator index + constexpr auto DIM = 16; + constexpr auto K = 10; + + auto N = 1024 * 1024 * 10; + auto num_query = 100; + auto [raw_data, timestamps, uids] = generate_data(N); + auto indexing = std::make_shared(); + auto conf = milvus::knowhere::Config{{milvus::knowhere::meta::DIM, DIM}, + {milvus::knowhere::meta::TOPK, K}, + {milvus::knowhere::IndexParams::nlist, 100}, + {milvus::knowhere::IndexParams::nprobe, 4}, + {milvus::knowhere::IndexParams::m, 4}, + {milvus::knowhere::IndexParams::nbits, 8}, + {milvus::knowhere::Metric::TYPE, milvus::knowhere::Metric::L2}, + {milvus::knowhere::meta::DEVICEID, 0}}; + + auto database = milvus::knowhere::GenDataset(N, DIM, raw_data.data()); + indexing->Train(database, conf); + indexing->AddWithoutIds(database, conf); + + EXPECT_EQ(indexing->Count(), N); + EXPECT_EQ(indexing->Dim(), DIM); + + // serializ index to binarySet + auto binary_set = indexing->Serialize(conf); + + // fill loadIndexInfo + LoadIndexInfo load_index_info; + auto& index_params = load_index_info.index_params; + index_params["index_type"] = "IVF_PQ"; + index_params["index_mode"] = "CPU"; + auto mode = milvus::knowhere::IndexMode::MODE_CPU; + load_index_info.index = + milvus::knowhere::VecIndexFactory::GetInstance().CreateVecIndex(index_params["index_type"], mode); + load_index_info.index->Load(binary_set); + + // search + auto query_dataset = milvus::knowhere::GenDataset(num_query, DIM, raw_data.data() + DIM * 4200); + + auto result = indexing->Query(query_dataset, conf, nullptr); + + auto ids = result->Get(milvus::knowhere::meta::IDS); + auto dis = result->Get(milvus::knowhere::meta::DISTANCE); + for (int i = 0; i < std::min(num_query * K, 100); ++i) { + std::cout << ids[i] << "->" << dis[i] << std::endl; + } +} \ No newline at end of file diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go index 8216ec9995..e1a61285af 100644 --- a/internal/master/grpc_service.go +++ b/internal/master/grpc_service.go @@ -443,39 +443,8 @@ func (s *Master) AssignSegmentID(ctx context.Context, request *internalpb.Assign }, nil } -func (s *Master) CreateIndex(ctx context.Context, req *internalpb.CreateIndexRequest) (*commonpb.Status, error) { - task := &createIndexTask{ - baseTask: baseTask{ - sch: s.scheduler, - mt: s.metaTable, - cv: make(chan error), - }, - req: req, - indexBuildScheduler: s.indexBuildSch, - indexLoadScheduler: s.indexLoadSch, - segManager: s.segmentManager, - } - - err := s.scheduler.Enqueue(task) - if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "Enqueue failed: " + err.Error(), - }, nil - } - - err = task.WaitToFinish(ctx) - if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "Create Index error: " + err.Error(), - }, nil - } - - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - Reason: "", - }, nil +func (s *Master) CreateIndex(context.Context, *internalpb.CreateIndexRequest) (*commonpb.Status, error) { + return nil, nil } func (s *Master) DescribeIndex(context.Context, *internalpb.DescribeIndexRequest) (*servicepb.DescribeIndexResponse, error) { diff --git a/internal/master/index_task.go b/internal/master/index_task.go deleted file mode 100644 index bb39fb986d..0000000000 --- a/internal/master/index_task.go +++ /dev/null @@ -1,95 +0,0 @@ -package master - -import ( - "fmt" - - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" -) - -type createIndexTask struct { - baseTask - req *internalpb.CreateIndexRequest - indexBuildScheduler *IndexBuildScheduler - indexLoadScheduler *IndexLoadScheduler - segManager *SegmentManager -} - -func (task *createIndexTask) Type() internalpb.MsgType { - return internalpb.MsgType_kCreateIndex -} - -func (task *createIndexTask) Ts() (Timestamp, error) { - return task.req.Timestamp, nil -} - -func (task *createIndexTask) Execute() error { - // modify schema - if err := task.mt.UpdateFieldIndexParams(task.req.CollectionName, task.req.FieldName, task.req.ExtraParams); err != nil { - return err - } - // check if closed segment has the same index build history - collMeta, err := task.mt.GetCollectionByName(task.req.CollectionName) - if err != nil { - return err - } - var fieldID int64 = -1 - for _, fieldSchema := range collMeta.Schema.Fields { - if fieldSchema.Name == task.req.FieldName { - fieldID = fieldSchema.FieldID - break - } - } - if fieldID == -1 { - return fmt.Errorf("can not find field name %s", task.req.FieldName) - } - - for _, segID := range collMeta.SegmentIDs { - segMeta, err := task.mt.GetSegmentByID(segID) - if err != nil { - return err - } - if segMeta.CloseTime == 0 { - continue - } - hasIndexMeta, err := task.mt.HasFieldIndexMeta(segID, fieldID, task.req.ExtraParams) - if err != nil { - return err - } - - if hasIndexMeta { - // load index - indexMeta, err := task.mt.GetFieldIndexMeta(segID, fieldID, task.req.ExtraParams) - if err != nil { - return err - } - err = task.indexLoadScheduler.Enqueue(&IndexLoadInfo{ - segmentID: segID, - fieldID: fieldID, - fieldName: task.req.FieldName, - indexFilePaths: indexMeta.IndexFilePaths, - }) - if err != nil { - return err - } - } else { - // create index - for _, kv := range segMeta.BinlogFilePaths { - if kv.FieldID != fieldID { - continue - } - err := task.indexBuildScheduler.Enqueue(&IndexBuildInfo{ - segmentID: segID, - fieldID: fieldID, - binlogFilePath: kv.BinlogFiles, - }) - if err != nil { - return err - } - break - } - } - } - - // close unfilled segment - return task.segManager.ForceClose(collMeta.ID) -} diff --git a/internal/master/master.go b/internal/master/master.go index f5eb1da71e..3313eaf5e5 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -150,7 +150,7 @@ func CreateServer(ctx context.Context) (*Master, error) { // stats msg stream statsMs := ms.NewPulsarMsgStream(ctx, 1024) statsMs.SetPulsarClient(pulsarAddr) - statsMs.CreatePulsarConsumers([]string{Params.QueryNodeStatsChannelName}, "SegmentStats", ms.NewUnmarshalDispatcher(), 1024) + statsMs.CreatePulsarConsumers([]string{Params.QueryNodeStatsChannelName}, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024) statsMs.Start() m := &Master{ diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 216583a005..601f412a4c 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -524,7 +524,7 @@ func (mt *metaTable) saveFieldIndexMetaToEtcd(meta *pb.FieldIndexMeta) error { return mt.client.Save(key, marshaledMeta) } -func (mt *metaTable) DeleteFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexParams []*commonpb.KeyValuePair) error { +func (mt *metaTable) DeleteFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexType string, indexParams []*commonpb.KeyValuePair) error { mt.indexLock.Lock() defer mt.indexLock.Unlock() @@ -568,22 +568,6 @@ func (mt *metaTable) HasFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexPa return false, nil } -func (mt *metaTable) GetFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexParams []*commonpb.KeyValuePair) (*pb.FieldIndexMeta, error) { - mt.indexLock.RLock() - defer mt.indexLock.RUnlock() - - if _, ok := mt.segID2IndexMetas[segID]; !ok { - return nil, fmt.Errorf("can not find segment %d", segID) - } - - for _, v := range mt.segID2IndexMetas[segID] { - if v.FieldID == fieldID && typeutil.CompareIndexParams(v.IndexParams, indexParams) { - return &v, nil - } - } - return nil, fmt.Errorf("can not find field %d", fieldID) -} - func (mt *metaTable) UpdateFieldIndexMeta(meta *pb.FieldIndexMeta) error { mt.indexLock.Lock() defer mt.indexLock.Unlock() @@ -651,30 +635,3 @@ func (mt *metaTable) GetFieldIndexParams(collID UniqueID, fieldID UniqueID) ([]* } return nil, fmt.Errorf("can not find field %d in collection %d", fieldID, collID) } - -func (mt *metaTable) UpdateFieldIndexParams(collName string, fieldName string, indexParams []*commonpb.KeyValuePair) error { - mt.ddLock.Lock() - defer mt.ddLock.Unlock() - - vid, ok := mt.collName2ID[collName] - if !ok { - return errors.Errorf("can't find collection: " + collName) - } - meta, ok := mt.collID2Meta[vid] - if !ok { - return errors.Errorf("can't find collection: " + collName) - } - - for _, fieldSchema := range meta.Schema.Fields { - if fieldSchema.Name == fieldName { - fieldSchema.IndexParams = indexParams - if err := mt.saveCollectionMeta(&meta); err != nil { - _ = mt.reloadFromKV() - return err - } - return nil - } - } - - return fmt.Errorf("can not find field with id %s", fieldName) -} diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index 00940dd75e..01a953c3f5 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -497,7 +497,7 @@ func TestMetaTable_IndexMeta(t *testing.T) { }) assert.Nil(t, err) assert.EqualValues(t, indexbuilderpb.IndexStatus_FINISHED, meta.segID2IndexMetas[1][0].Status) - err = meta.DeleteFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}}) + err = meta.DeleteFieldIndexMeta(1, 100, "type1", []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}}) assert.Nil(t, err) res, err = meta.HasFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}}) assert.Nil(t, err) diff --git a/internal/master/segment_manager.go b/internal/master/segment_manager.go index 2e7ae0f24f..4e80cf6f4d 100644 --- a/internal/master/segment_manager.go +++ b/internal/master/segment_manager.go @@ -358,20 +358,6 @@ func (manager *SegmentManager) initChannelRanges() error { } return nil } - -// ForceClose set segments of collection with collID closable, segment will be closed after the assignments of it has expired -func (manager *SegmentManager) ForceClose(collID UniqueID) error { - status, ok := manager.collStatus[collID] - if !ok { - return nil - } - - for _, segStatus := range status.segments { - segStatus.closable = true - } - return nil -} - func NewSegmentManager(ctx context.Context, meta *metaTable, globalIDAllocator func() (UniqueID, error), diff --git a/internal/msgstream/unmarshal.go b/internal/msgstream/unmarshal.go index 967b5d652a..3c516e84ec 100644 --- a/internal/msgstream/unmarshal.go +++ b/internal/msgstream/unmarshal.go @@ -34,6 +34,7 @@ func (dispatcher *UnmarshalDispatcher) addDefaultMsgTemplates() { dropCollectionMsg := DropCollectionMsg{} createPartitionMsg := CreatePartitionMsg{} dropPartitionMsg := DropPartitionMsg{} + loadIndexMsg := LoadIndexMsg{} queryNodeSegStatsMsg := QueryNodeStatsMsg{} dispatcher.tempMap = make(map[internalPb.MsgType]UnmarshalFunc) @@ -47,6 +48,7 @@ func (dispatcher *UnmarshalDispatcher) addDefaultMsgTemplates() { dispatcher.tempMap[internalPb.MsgType_kDropCollection] = dropCollectionMsg.Unmarshal dispatcher.tempMap[internalPb.MsgType_kCreatePartition] = createPartitionMsg.Unmarshal dispatcher.tempMap[internalPb.MsgType_kDropPartition] = dropPartitionMsg.Unmarshal + dispatcher.tempMap[internalPb.MsgType_kLoadIndex] = loadIndexMsg.Unmarshal } diff --git a/internal/proto/internal_msg.proto b/internal/proto/internal_msg.proto index b5da1f0459..b9961e309a 100644 --- a/internal/proto/internal_msg.proto +++ b/internal/proto/internal_msg.proto @@ -291,8 +291,10 @@ message Key2SegMsg { message LoadIndex { MsgType msg_type = 1; int64 segmentID = 2; - int64 fieldID = 3; - repeated string index_paths = 4; + string fieldName = 3; + int64 fieldID = 4; + repeated string index_paths = 5; + repeated common.KeyValuePair index_params = 6; } message IndexStats { diff --git a/internal/proto/internalpb/internal_msg.pb.go b/internal/proto/internalpb/internal_msg.pb.go index 50c09702ff..4432e64492 100644 --- a/internal/proto/internalpb/internal_msg.pb.go +++ b/internal/proto/internalpb/internal_msg.pb.go @@ -2043,13 +2043,15 @@ func (m *Key2SegMsg) GetKey2Seg() []*Key2Seg { } type LoadIndex struct { - MsgType MsgType `protobuf:"varint,1,opt,name=msg_type,json=msgType,proto3,enum=milvus.proto.internal.MsgType" json:"msg_type,omitempty"` - SegmentID int64 `protobuf:"varint,2,opt,name=segmentID,proto3" json:"segmentID,omitempty"` - FieldID int64 `protobuf:"varint,3,opt,name=fieldID,proto3" json:"fieldID,omitempty"` - IndexPaths []string `protobuf:"bytes,4,rep,name=index_paths,json=indexPaths,proto3" json:"index_paths,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + MsgType MsgType `protobuf:"varint,1,opt,name=msg_type,json=msgType,proto3,enum=milvus.proto.internal.MsgType" json:"msg_type,omitempty"` + SegmentID int64 `protobuf:"varint,2,opt,name=segmentID,proto3" json:"segmentID,omitempty"` + FieldName string `protobuf:"bytes,3,opt,name=fieldName,proto3" json:"fieldName,omitempty"` + FieldID int64 `protobuf:"varint,4,opt,name=fieldID,proto3" json:"fieldID,omitempty"` + IndexPaths []string `protobuf:"bytes,5,rep,name=index_paths,json=indexPaths,proto3" json:"index_paths,omitempty"` + IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,6,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *LoadIndex) Reset() { *m = LoadIndex{} } @@ -2091,6 +2093,13 @@ func (m *LoadIndex) GetSegmentID() int64 { return 0 } +func (m *LoadIndex) GetFieldName() string { + if m != nil { + return m.FieldName + } + return "" +} + func (m *LoadIndex) GetFieldID() int64 { if m != nil { return m.FieldID @@ -2105,6 +2114,13 @@ func (m *LoadIndex) GetIndexPaths() []string { return nil } +func (m *LoadIndex) GetIndexParams() []*commonpb.KeyValuePair { + if m != nil { + return m.IndexParams + } + return nil +} + type IndexStats struct { IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,1,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"` NumRelatedSegments int64 `protobuf:"varint,2,opt,name=num_related_segments,json=numRelatedSegments,proto3" json:"num_related_segments,omitempty"` @@ -2637,120 +2653,120 @@ func init() { func init() { proto.RegisterFile("internal_msg.proto", fileDescriptor_7eb37f6b80b23116) } var fileDescriptor_7eb37f6b80b23116 = []byte{ - // 1831 bytes of a gzipped FileDescriptorProto + // 1837 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x59, 0xcd, 0x6f, 0x23, 0x49, 0x15, 0xdf, 0xee, 0xf6, 0x47, 0xfc, 0xec, 0x38, 0x3d, 0x95, 0x64, 0xc6, 0xb3, 0xb3, 0xec, 0x64, 0x7a, 0x10, 0x1b, 0x16, 0x91, 0x40, 0x86, 0x03, 0x7b, 0x83, 0xc4, 0x5a, 0xd6, 0x0c, 0x19, 0x85, 0x4e, 0xb4, 0x48, 0x68, 0xa5, 0x56, 0xc7, 0x7e, 0xb1, 0x4b, 0xfd, 0xe5, 0x54, 0xb5, 0x27, 0xf1, - 0x1c, 0x90, 0x90, 0xe6, 0x8c, 0xf8, 0x10, 0x07, 0x4e, 0x70, 0x87, 0x1d, 0xb1, 0x20, 0xfe, 0x07, - 0x3e, 0x2f, 0xfc, 0x17, 0x70, 0x00, 0x89, 0x5d, 0x0e, 0xdc, 0x50, 0x55, 0xf5, 0x87, 0x3b, 0xb1, - 0x9d, 0x88, 0x64, 0x96, 0x59, 0xed, 0xdc, 0xaa, 0x5e, 0x97, 0xab, 0xde, 0xef, 0xf7, 0x5e, 0xbd, - 0x7a, 0xef, 0x19, 0x08, 0x0d, 0x63, 0x64, 0xa1, 0xeb, 0x3b, 0x01, 0xef, 0x6f, 0x0c, 0x59, 0x14, - 0x47, 0x64, 0x35, 0xa0, 0xfe, 0xe3, 0x11, 0x57, 0xb3, 0x8d, 0x74, 0xc1, 0xab, 0x8d, 0x6e, 0x14, - 0x04, 0x51, 0xa8, 0xc4, 0xaf, 0xde, 0xe0, 0xc8, 0x1e, 0xd3, 0x2e, 0xe6, 0xbf, 0xb3, 0x42, 0xa8, - 0x75, 0xda, 0x36, 0x1e, 0x8f, 0x90, 0xc7, 0xe4, 0x26, 0x54, 0x86, 0x88, 0xac, 0xd3, 0x6e, 0x69, - 0x6b, 0xda, 0xba, 0x61, 0x27, 0x33, 0xf2, 0x00, 0x4a, 0x2c, 0xf2, 0xb1, 0xa5, 0xaf, 0x69, 0xeb, - 0xcd, 0xad, 0xbb, 0x1b, 0x53, 0xcf, 0xda, 0xd8, 0x43, 0x64, 0x76, 0xe4, 0xa3, 0x2d, 0x17, 0x93, - 0x15, 0x28, 0x77, 0xa3, 0x51, 0x18, 0xb7, 0x8c, 0x35, 0x6d, 0x7d, 0xd1, 0x56, 0x13, 0xab, 0x0f, - 0x20, 0xce, 0xe3, 0xc3, 0x28, 0xe4, 0x48, 0x1e, 0x40, 0x85, 0xc7, 0x6e, 0x3c, 0xe2, 0xf2, 0xc0, - 0xfa, 0xd6, 0x9d, 0xe2, 0xd6, 0x89, 0xf2, 0xfb, 0x72, 0x89, 0x9d, 0x2c, 0x25, 0x4d, 0xd0, 0x3b, - 0x6d, 0xa9, 0x8b, 0x61, 0xeb, 0x9d, 0xf6, 0x8c, 0x83, 0x22, 0x80, 0x03, 0x1e, 0x7d, 0x8c, 0xc8, - 0x1e, 0x43, 0x5d, 0x1e, 0x78, 0x15, 0x68, 0xaf, 0x41, 0x2d, 0xa6, 0x01, 0xf2, 0xd8, 0x0d, 0x86, - 0x52, 0xa7, 0x92, 0x9d, 0x0b, 0x66, 0x9c, 0xfb, 0x54, 0x83, 0xc6, 0x3e, 0xf6, 0x73, 0x2b, 0x66, - 0xcb, 0xb4, 0x89, 0x65, 0x62, 0xeb, 0xee, 0xc0, 0x0d, 0x43, 0xf4, 0x13, 0xf2, 0xca, 0x76, 0x2e, - 0x20, 0x77, 0xa0, 0xd6, 0x8d, 0x7c, 0xdf, 0x09, 0xdd, 0x00, 0xe5, 0xf6, 0x35, 0x7b, 0x41, 0x08, - 0x1e, 0xb9, 0x01, 0x92, 0xfb, 0xb0, 0x38, 0x74, 0x59, 0x4c, 0x63, 0x1a, 0x85, 0x4e, 0xec, 0xf6, - 0x5b, 0x25, 0xb9, 0xa0, 0x91, 0x09, 0x0f, 0xdc, 0xbe, 0xf5, 0x4c, 0x03, 0xf2, 0x75, 0xce, 0x69, - 0x3f, 0x2c, 0x28, 0x73, 0xad, 0xc4, 0x3f, 0x84, 0xa5, 0x21, 0x32, 0x27, 0x51, 0xdb, 0x61, 0x78, - 0xdc, 0x32, 0xd6, 0x8c, 0xf5, 0xfa, 0xd6, 0xfd, 0x19, 0xbf, 0x9f, 0x54, 0xc5, 0x5e, 0x1c, 0x22, - 0xdb, 0x51, 0x3f, 0xb5, 0xf1, 0xd8, 0xfa, 0x50, 0x83, 0x25, 0xf9, 0x5d, 0x69, 0x1d, 0x60, 0x28, - 0xa9, 0xe3, 0x42, 0x94, 0x28, 0xab, 0x26, 0x17, 0x50, 0x37, 0xd5, 0x2a, 0x45, 0x42, 0x4b, 0x17, - 0x11, 0x5a, 0x3e, 0x4f, 0x28, 0xb9, 0x0b, 0x75, 0x3c, 0x1d, 0x52, 0x86, 0x8e, 0xf0, 0x80, 0x56, - 0x45, 0x7a, 0x03, 0x28, 0xd1, 0x01, 0x0d, 0x26, 0x3d, 0xac, 0x7a, 0x69, 0x0f, 0xb3, 0x38, 0x2c, - 0x17, 0xac, 0x94, 0x78, 0xeb, 0x7b, 0x70, 0x73, 0x92, 0x59, 0x37, 0xa3, 0xa4, 0xa5, 0x49, 0x82, - 0x3f, 0x37, 0x8f, 0xe0, 0x9c, 0x40, 0x7b, 0x25, 0xe7, 0x38, 0x97, 0x5a, 0xff, 0xd1, 0xe0, 0xd6, - 0x0e, 0x43, 0x37, 0xc6, 0x9d, 0xc8, 0xf7, 0xb1, 0x2b, 0x20, 0xa6, 0x0e, 0xf2, 0x16, 0x2c, 0x04, - 0xbc, 0xef, 0xc4, 0xe3, 0x21, 0x4a, 0xd6, 0x9b, 0x5b, 0xaf, 0xcf, 0x38, 0x6b, 0x97, 0xf7, 0x0f, - 0xc6, 0x43, 0xb4, 0xab, 0x81, 0x1a, 0x10, 0x0b, 0x1a, 0xdd, 0x6c, 0xbf, 0x2c, 0x24, 0x14, 0x64, - 0xc2, 0x3a, 0x0c, 0x8f, 0x3b, 0x6d, 0x69, 0x1d, 0xc3, 0x56, 0x93, 0xe2, 0x3d, 0x2b, 0x9d, 0xbd, - 0x67, 0x2d, 0xa8, 0x0e, 0x59, 0x74, 0x3a, 0xee, 0xb4, 0xa5, 0x61, 0x0c, 0x3b, 0x9d, 0x92, 0x2f, - 0x43, 0x85, 0x77, 0x07, 0x18, 0xb8, 0xd2, 0x1c, 0xf5, 0xad, 0xdb, 0x53, 0x29, 0xdf, 0xf6, 0xa3, - 0x43, 0x3b, 0x59, 0x68, 0xfd, 0x44, 0x87, 0xd5, 0x36, 0x8b, 0x86, 0x9f, 0x70, 0xe4, 0xbb, 0xb0, - 0x94, 0xef, 0xae, 0xbc, 0x5a, 0x51, 0xf0, 0xd9, 0xa2, 0xce, 0xc9, 0x0b, 0xb3, 0x91, 0xc3, 0x15, - 0x1e, 0x6f, 0x37, 0xbb, 0x85, 0xb9, 0xf5, 0x0f, 0x0d, 0x56, 0xde, 0x71, 0xf9, 0xb5, 0x92, 0x92, - 0x01, 0xd6, 0x67, 0x02, 0x36, 0xe6, 0x00, 0x2e, 0x5d, 0x08, 0xb8, 0x7c, 0x05, 0xc0, 0x1f, 0x6a, - 0x70, 0xbb, 0x8d, 0xbc, 0xcb, 0xe8, 0x21, 0x7e, 0x7a, 0x50, 0xff, 0x42, 0x83, 0xd5, 0xfd, 0x41, - 0x74, 0xf2, 0xe2, 0x22, 0xb6, 0x7e, 0xab, 0xc3, 0x4d, 0x15, 0x9b, 0xf6, 0xd2, 0xe8, 0xfb, 0x31, - 0x5d, 0xd0, 0x35, 0xa8, 0x67, 0x01, 0x3f, 0xbb, 0xa6, 0x93, 0xa2, 0x1c, 0x69, 0x69, 0x26, 0xd2, - 0xf2, 0x1c, 0xa4, 0x95, 0xa2, 0x6d, 0xbf, 0x09, 0xcd, 0xfc, 0xd5, 0x91, 0xa6, 0x55, 0xef, 0xc6, - 0xfd, 0xe9, 0xa6, 0xcd, 0xe8, 0x90, 0x96, 0xcd, 0x1f, 0x2c, 0x69, 0xd8, 0x0f, 0x74, 0x58, 0x11, - 0x51, 0xed, 0x25, 0x67, 0x97, 0xe7, 0xec, 0xef, 0x1a, 0x2c, 0xbf, 0xe3, 0xf2, 0xeb, 0xa4, 0xec, - 0x7a, 0x2f, 0xff, 0x79, 0xb0, 0xe5, 0xff, 0x19, 0xec, 0x3f, 0x35, 0x68, 0xa5, 0xf1, 0xee, 0xd3, - 0x81, 0x58, 0x3c, 0x69, 0x22, 0xd6, 0xbd, 0xb8, 0x68, 0xaf, 0x39, 0xb8, 0xff, 0x4b, 0x87, 0xc5, - 0x4e, 0xc8, 0x91, 0xc5, 0xcf, 0x0d, 0xe9, 0x1b, 0xe7, 0x35, 0x56, 0xc5, 0xc9, 0x19, 0x5d, 0x2e, - 0x55, 0xa2, 0x08, 0xde, 0x38, 0xf6, 0x45, 0x46, 0x9a, 0xe5, 0x37, 0xb9, 0xa0, 0x98, 0xe5, 0xab, - 0x30, 0x30, 0x91, 0xe5, 0x4f, 0xb0, 0x5a, 0x2d, 0xb2, 0xfa, 0x3a, 0x40, 0x46, 0x3e, 0x6f, 0x2d, - 0xac, 0x19, 0x22, 0x4d, 0xcf, 0x25, 0xa2, 0x02, 0x62, 0xd1, 0x49, 0xa7, 0xcd, 0x5b, 0xb5, 0x35, - 0x43, 0x54, 0x40, 0x6a, 0x46, 0xbe, 0x02, 0x0b, 0x2c, 0x3a, 0x71, 0x7a, 0x6e, 0xec, 0xb6, 0x40, - 0x26, 0xd9, 0x73, 0xb2, 0xc9, 0x2a, 0x8b, 0x4e, 0xda, 0x6e, 0xec, 0x5a, 0x4f, 0x75, 0x58, 0x6c, - 0xa3, 0x8f, 0x31, 0xfe, 0xff, 0x49, 0x2f, 0x30, 0x56, 0x9a, 0xc3, 0x58, 0x79, 0x1e, 0x63, 0x95, - 0x73, 0x8c, 0xdd, 0x83, 0xc6, 0x90, 0xd1, 0xc0, 0x65, 0x63, 0xc7, 0xc3, 0xb1, 0x28, 0x6f, 0x0c, - 0x19, 0xe5, 0x95, 0xec, 0x21, 0x8e, 0xb9, 0xf5, 0x91, 0x06, 0x8b, 0xfb, 0xe8, 0xb2, 0xee, 0xe0, - 0xb9, 0xd1, 0x30, 0xa1, 0xbf, 0x51, 0xd4, 0x7f, 0x7e, 0x0e, 0xfd, 0x79, 0x30, 0x19, 0xf2, 0x91, - 0x1f, 0x3b, 0x39, 0x39, 0x8a, 0x80, 0x25, 0x25, 0xdf, 0xc9, 0x28, 0xda, 0x84, 0xf2, 0xf1, 0x08, - 0xd9, 0xf8, 0xe2, 0x6a, 0x42, 0xad, 0xb3, 0xfe, 0xaa, 0x81, 0xb9, 0x3f, 0xe6, 0x3b, 0x51, 0x78, - 0x44, 0xfb, 0x2f, 0x1c, 0x72, 0x02, 0x25, 0x69, 0xaf, 0xf2, 0x9a, 0xb1, 0x5e, 0xb3, 0xe5, 0x58, - 0xd8, 0xd2, 0xc3, 0xb1, 0x33, 0x64, 0x78, 0x44, 0x4f, 0x51, 0x59, 0xbb, 0x66, 0xd7, 0x3d, 0x1c, - 0xef, 0x25, 0x22, 0xeb, 0x99, 0x0e, 0x8d, 0xd4, 0x96, 0x82, 0x9f, 0xab, 0x00, 0xca, 0x6b, 0x62, - 0xfd, 0xf2, 0x5d, 0x97, 0xe9, 0x95, 0xd2, 0xec, 0x38, 0x7a, 0x0f, 0x1a, 0xd2, 0x1c, 0x4e, 0x18, - 0xf5, 0x30, 0xb3, 0x6e, 0x5d, 0xca, 0x1e, 0x49, 0x51, 0x91, 0xa8, 0xca, 0x65, 0x5c, 0xa4, 0x3a, - 0xdd, 0x45, 0x08, 0x94, 0x06, 0x34, 0x56, 0x71, 0xa5, 0x61, 0xcb, 0xb1, 0xf5, 0x3d, 0xa8, 0x1f, - 0xd0, 0x00, 0x0f, 0x68, 0xd7, 0xdb, 0xe5, 0xfd, 0xab, 0xd0, 0x95, 0x77, 0x67, 0xf4, 0x42, 0x77, - 0x66, 0xee, 0x0b, 0x63, 0x7d, 0x5f, 0x83, 0x85, 0xb7, 0xfd, 0x11, 0x1f, 0x5c, 0xf1, 0xf4, 0x42, - 0x3c, 0xd6, 0xa7, 0xc4, 0xe3, 0x39, 0x3a, 0xfc, 0x4c, 0x83, 0xea, 0x43, 0x1c, 0x6f, 0xed, 0x63, - 0x5f, 0xda, 0x4f, 0xc4, 0xd4, 0xb4, 0x6b, 0x23, 0x27, 0xe4, 0x2e, 0xd4, 0x27, 0xa2, 0x48, 0xb2, - 0x3f, 0xe4, 0x41, 0xe4, 0x82, 0x67, 0xf4, 0x36, 0x2c, 0x50, 0xee, 0x3c, 0x76, 0x7d, 0xda, 0x93, - 0xf6, 0x5f, 0xb0, 0xab, 0x94, 0xbf, 0x2b, 0xa6, 0x22, 0x7e, 0x65, 0x6a, 0x2a, 0x6f, 0x37, 0xec, - 0x09, 0x89, 0xf5, 0x1e, 0x40, 0xa2, 0x9a, 0x20, 0x28, 0xf3, 0x2e, 0x6d, 0xd2, 0xbb, 0xbe, 0x0a, - 0x55, 0x0f, 0xc7, 0x5b, 0x1c, 0xfb, 0x2d, 0x5d, 0x06, 0xff, 0x59, 0xac, 0x25, 0x3b, 0xd9, 0xe9, - 0x72, 0xeb, 0xe7, 0x1a, 0xd4, 0xbe, 0x15, 0xb9, 0xbd, 0x4e, 0xd8, 0xc3, 0xd3, 0xe7, 0x47, 0x7f, - 0x0b, 0xaa, 0x47, 0x14, 0xfd, 0x5e, 0x1e, 0x04, 0x92, 0xa9, 0x20, 0x96, 0x8a, 0xb3, 0x9d, 0xa1, - 0x1b, 0x0f, 0x78, 0xab, 0x24, 0x6f, 0x34, 0x48, 0xd1, 0x9e, 0x90, 0x58, 0x4f, 0x35, 0x00, 0xa9, - 0x9d, 0xb8, 0x67, 0x9c, 0xb4, 0xa1, 0x91, 0xae, 0x67, 0x6e, 0xc0, 0x93, 0x8e, 0xd2, 0xbd, 0xa9, - 0x37, 0xf3, 0x21, 0x8e, 0xdf, 0x75, 0xfd, 0x11, 0xee, 0xb9, 0x94, 0xd9, 0xf5, 0x64, 0x4f, 0xf1, - 0x2b, 0xf2, 0x25, 0x58, 0x09, 0x47, 0x81, 0xc3, 0xd0, 0x77, 0x63, 0xec, 0x39, 0x89, 0xa2, 0x3c, - 0x51, 0x9c, 0x84, 0xa3, 0xc0, 0x56, 0x9f, 0xf6, 0x93, 0x2f, 0xd6, 0x0f, 0x34, 0x80, 0xb7, 0x85, - 0xce, 0x4a, 0x8d, 0xb3, 0xe5, 0x85, 0x36, 0xa5, 0xbc, 0x98, 0x00, 0xad, 0x17, 0x41, 0x6f, 0xa7, - 0xa0, 0x45, 0xcc, 0xe0, 0x49, 0xdb, 0xf1, 0xde, 0x0c, 0xaa, 0x73, 0xf0, 0x09, 0x2f, 0x72, 0x6c, - 0xfd, 0x54, 0x75, 0x6a, 0x85, 0x76, 0x4a, 0xa5, 0x82, 0x05, 0xb4, 0xb3, 0x16, 0xb8, 0x0b, 0xf5, - 0x00, 0x83, 0x88, 0x8d, 0x1d, 0x4e, 0x9f, 0x60, 0xea, 0xc0, 0x4a, 0xb4, 0x4f, 0x9f, 0xa0, 0x70, - 0x51, 0x49, 0x49, 0x74, 0xc2, 0x53, 0x1b, 0x09, 0x1a, 0xa2, 0x13, 0x4e, 0xbe, 0x00, 0x37, 0x18, - 0x76, 0x31, 0x8c, 0xfd, 0xb1, 0x13, 0x44, 0x3d, 0x7a, 0x44, 0x31, 0x75, 0x63, 0x33, 0xfd, 0xb0, - 0x9b, 0xc8, 0xad, 0xbf, 0x69, 0xd0, 0xfc, 0x76, 0x1a, 0xbc, 0x94, 0x66, 0xcf, 0x21, 0xa6, 0x7c, - 0x4d, 0x82, 0x2d, 0xf0, 0x37, 0xa7, 0x6d, 0x9b, 0x91, 0x64, 0x2f, 0x70, 0xec, 0x2b, 0xa5, 0xb6, - 0xa1, 0x2e, 0xcd, 0x91, 0xec, 0x51, 0x9a, 0x6b, 0x83, 0xdc, 0xf2, 0x36, 0x1c, 0x65, 0x63, 0xeb, - 0x97, 0x3a, 0x10, 0x55, 0xee, 0x4b, 0x23, 0xbd, 0x70, 0x39, 0xfa, 0x1b, 0xd3, 0x73, 0xf4, 0xf3, - 0xc9, 0xd7, 0x67, 0x40, 0xc1, 0xca, 0x7b, 0x71, 0x35, 0xbb, 0x26, 0x25, 0xf2, 0x73, 0x1b, 0x1a, - 0x78, 0x1a, 0x33, 0x37, 0xbd, 0x74, 0xd5, 0x4b, 0x5f, 0x3a, 0xf9, 0x33, 0x75, 0xe9, 0xac, 0xf7, - 0x45, 0x99, 0x9f, 0x54, 0x71, 0x2f, 0xf9, 0xba, 0x98, 0xaf, 0xdf, 0xe9, 0xf0, 0x5a, 0x81, 0xaf, - 0x3d, 0x16, 0xf5, 0x19, 0x72, 0xfe, 0x92, 0xb7, 0x79, 0xbc, 0xbd, 0xf9, 0x17, 0x03, 0xaa, 0x09, - 0x60, 0x52, 0x83, 0xb2, 0xf7, 0x28, 0x0a, 0xd1, 0x7c, 0x85, 0xac, 0xc2, 0x0d, 0xef, 0xec, 0xff, - 0x06, 0x66, 0x8f, 0x2c, 0xc3, 0x92, 0x57, 0x6c, 0xa9, 0x9b, 0x48, 0x08, 0x34, 0xbd, 0x42, 0x47, - 0xd9, 0x3c, 0x22, 0xb7, 0x60, 0xd9, 0x3b, 0xdf, 0x74, 0x35, 0xc5, 0x9b, 0x6c, 0x7a, 0xc5, 0xbe, - 0x24, 0x37, 0x07, 0x72, 0x8b, 0x6f, 0x60, 0x9c, 0x25, 0xd8, 0xdc, 0xa4, 0x64, 0x15, 0x4c, 0xef, - 0x4c, 0x7b, 0xd0, 0xfc, 0xbd, 0x46, 0x96, 0xa1, 0xe9, 0x15, 0xfa, 0x5f, 0xe6, 0x1f, 0x34, 0x42, - 0x60, 0xd1, 0x9b, 0x6c, 0xf0, 0x98, 0x7f, 0xd4, 0xc8, 0x2d, 0x20, 0xde, 0xb9, 0x3e, 0x88, 0xf9, - 0x27, 0x8d, 0xac, 0xc0, 0x92, 0x57, 0x68, 0x17, 0x70, 0xf3, 0xcf, 0x1a, 0xb9, 0x01, 0x0d, 0x6f, - 0x22, 0x3c, 0x99, 0xbf, 0xd2, 0xd5, 0x51, 0x93, 0x3e, 0x65, 0xbe, 0xaf, 0x93, 0x3b, 0x70, 0xd3, - 0x9b, 0xea, 0x68, 0xe6, 0x33, 0x9d, 0x34, 0xa0, 0xea, 0xa9, 0xc2, 0xdc, 0xfc, 0xa1, 0x21, 0x67, - 0xaa, 0x62, 0x34, 0x7f, 0x64, 0x90, 0x3a, 0x54, 0x3c, 0x99, 0xbb, 0x99, 0x3f, 0x56, 0x9f, 0x54, - 0xe6, 0x6d, 0x7e, 0x64, 0x48, 0xf5, 0x27, 0xf3, 0x70, 0xf3, 0xdf, 0x06, 0x69, 0x42, 0xcd, 0x4b, - 0x73, 0x4d, 0xf3, 0xd7, 0x35, 0xa9, 0x75, 0xf1, 0xa9, 0x30, 0x3f, 0xa8, 0x91, 0x25, 0x00, 0x2f, - 0x4b, 0x49, 0xcc, 0xdf, 0xd4, 0xde, 0x7c, 0x0b, 0x16, 0xd2, 0xbf, 0xee, 0x08, 0x40, 0x65, 0xd7, - 0xe5, 0x31, 0x32, 0xf3, 0x15, 0x31, 0xb6, 0xd1, 0xed, 0x21, 0x33, 0x35, 0x31, 0xfe, 0x0e, 0xa3, - 0x42, 0xae, 0x0b, 0x9b, 0xef, 0x09, 0xc7, 0x34, 0x8d, 0xed, 0xf6, 0x77, 0xb7, 0xfb, 0x34, 0x1e, - 0x8c, 0x0e, 0x85, 0xd7, 0x6c, 0x3e, 0xa1, 0xbe, 0x4f, 0x9f, 0xc4, 0xd8, 0x1d, 0x6c, 0x2a, 0x8f, - 0xfa, 0x62, 0x8f, 0xf2, 0x98, 0xd1, 0xc3, 0x51, 0x8c, 0xbd, 0xcd, 0xf4, 0xb2, 0x6c, 0x4a, 0x37, - 0xcb, 0xa6, 0xc3, 0xc3, 0xc3, 0x8a, 0x94, 0x3c, 0xf8, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x40, - 0xb8, 0xbd, 0x0b, 0x2b, 0x1f, 0x00, 0x00, + 0x1c, 0x90, 0x10, 0x73, 0x46, 0x7c, 0x88, 0x03, 0x37, 0xee, 0xb0, 0x23, 0x16, 0xc4, 0xff, 0xc0, + 0xe7, 0x85, 0xff, 0x02, 0x0e, 0x20, 0xb1, 0xcb, 0x81, 0x1b, 0xaa, 0xaa, 0xfe, 0x70, 0x27, 0xb6, + 0x13, 0x6d, 0x32, 0xcb, 0xa0, 0x9d, 0x5b, 0xbd, 0xe7, 0xea, 0xaa, 0xf7, 0x7e, 0xef, 0xa3, 0xde, + 0x7b, 0x06, 0x42, 0xc3, 0x18, 0x59, 0xe8, 0xfa, 0x4e, 0xc0, 0xfb, 0x1b, 0x43, 0x16, 0xc5, 0x11, + 0x59, 0x0d, 0xa8, 0xff, 0x78, 0xc4, 0x15, 0xb5, 0x91, 0x6e, 0x78, 0xb5, 0xd1, 0x8d, 0x82, 0x20, + 0x0a, 0x15, 0xfb, 0xd5, 0x1b, 0x1c, 0xd9, 0x63, 0xda, 0xc5, 0xfc, 0x3b, 0x2b, 0x84, 0x5a, 0xa7, + 0x6d, 0xe3, 0xf1, 0x08, 0x79, 0x4c, 0x6e, 0x42, 0x65, 0x88, 0xc8, 0x3a, 0xed, 0x96, 0xb6, 0xa6, + 0xad, 0x1b, 0x76, 0x42, 0x91, 0x07, 0x50, 0x62, 0x91, 0x8f, 0x2d, 0x7d, 0x4d, 0x5b, 0x6f, 0x6e, + 0xdd, 0xdd, 0x98, 0x7a, 0xd7, 0xc6, 0x1e, 0x22, 0xb3, 0x23, 0x1f, 0x6d, 0xb9, 0x99, 0xac, 0x40, + 0xb9, 0x1b, 0x8d, 0xc2, 0xb8, 0x65, 0xac, 0x69, 0xeb, 0x8b, 0xb6, 0x22, 0xac, 0x3e, 0x80, 0xb8, + 0x8f, 0x0f, 0xa3, 0x90, 0x23, 0x79, 0x00, 0x15, 0x1e, 0xbb, 0xf1, 0x88, 0xcb, 0x0b, 0xeb, 0x5b, + 0x77, 0x8a, 0x47, 0x27, 0xc2, 0xef, 0xcb, 0x2d, 0x76, 0xb2, 0x95, 0x34, 0x41, 0xef, 0xb4, 0xa5, + 0x2c, 0x86, 0xad, 0x77, 0xda, 0x33, 0x2e, 0x8a, 0x00, 0x0e, 0x78, 0xf4, 0x09, 0x6a, 0xf6, 0x18, + 0xea, 0xf2, 0xc2, 0xab, 0xa8, 0xf6, 0x1a, 0xd4, 0x62, 0x1a, 0x20, 0x8f, 0xdd, 0x60, 0x28, 0x65, + 0x2a, 0xd9, 0x39, 0x63, 0xc6, 0xbd, 0x4f, 0x35, 0x68, 0xec, 0x63, 0x3f, 0xb7, 0x62, 0xb6, 0x4d, + 0x9b, 0xd8, 0x26, 0x8e, 0xee, 0x0e, 0xdc, 0x30, 0x44, 0x3f, 0x01, 0xaf, 0x6c, 0xe7, 0x0c, 0x72, + 0x07, 0x6a, 0xdd, 0xc8, 0xf7, 0x9d, 0xd0, 0x0d, 0x50, 0x1e, 0x5f, 0xb3, 0x17, 0x04, 0xe3, 0x91, + 0x1b, 0x20, 0xb9, 0x0f, 0x8b, 0x43, 0x97, 0xc5, 0x34, 0xa6, 0x51, 0xe8, 0xc4, 0x6e, 0xbf, 0x55, + 0x92, 0x1b, 0x1a, 0x19, 0xf3, 0xc0, 0xed, 0x5b, 0xcf, 0x34, 0x20, 0x5f, 0xe7, 0x9c, 0xf6, 0xc3, + 0x82, 0x30, 0xd7, 0x0a, 0xfc, 0x43, 0x58, 0x1a, 0x22, 0x73, 0x12, 0xb1, 0x1d, 0x86, 0xc7, 0x2d, + 0x63, 0xcd, 0x58, 0xaf, 0x6f, 0xdd, 0x9f, 0xf1, 0xfd, 0xa4, 0x28, 0xf6, 0xe2, 0x10, 0xd9, 0x8e, + 0xfa, 0xd4, 0xc6, 0x63, 0xeb, 0x43, 0x0d, 0x96, 0xe4, 0xef, 0x4a, 0xea, 0x00, 0x43, 0x09, 0x1d, + 0x17, 0xac, 0x44, 0x58, 0x45, 0x5c, 0x00, 0xdd, 0x54, 0xab, 0x14, 0x01, 0x2d, 0x5d, 0x04, 0x68, + 0xf9, 0x3c, 0xa0, 0xe4, 0x2e, 0xd4, 0xf1, 0x74, 0x48, 0x19, 0x3a, 0xc2, 0x03, 0x5a, 0x15, 0xe9, + 0x0d, 0xa0, 0x58, 0x07, 0x34, 0x98, 0xf4, 0xb0, 0xea, 0xa5, 0x3d, 0xcc, 0xe2, 0xb0, 0x5c, 0xb0, + 0x52, 0xe2, 0xad, 0xef, 0xc1, 0xcd, 0x49, 0x64, 0xdd, 0x0c, 0x92, 0x96, 0x26, 0x01, 0xfe, 0xdc, + 0x3c, 0x80, 0x73, 0x00, 0xed, 0x95, 0x1c, 0xe3, 0x9c, 0x6b, 0xfd, 0x47, 0x83, 0x5b, 0x3b, 0x0c, + 0xdd, 0x18, 0x77, 0x22, 0xdf, 0xc7, 0xae, 0x50, 0x31, 0x75, 0x90, 0xb7, 0x60, 0x21, 0xe0, 0x7d, + 0x27, 0x1e, 0x0f, 0x51, 0xa2, 0xde, 0xdc, 0x7a, 0x7d, 0xc6, 0x5d, 0xbb, 0xbc, 0x7f, 0x30, 0x1e, + 0xa2, 0x5d, 0x0d, 0xd4, 0x82, 0x58, 0xd0, 0xe8, 0x66, 0xe7, 0x65, 0x29, 0xa1, 0xc0, 0x13, 0xd6, + 0x61, 0x78, 0xdc, 0x69, 0x4b, 0xeb, 0x18, 0xb6, 0x22, 0x8a, 0x71, 0x56, 0x3a, 0x1b, 0x67, 0x2d, + 0xa8, 0x0e, 0x59, 0x74, 0x3a, 0xee, 0xb4, 0xa5, 0x61, 0x0c, 0x3b, 0x25, 0xc9, 0x97, 0xa1, 0xc2, + 0xbb, 0x03, 0x0c, 0x5c, 0x69, 0x8e, 0xfa, 0xd6, 0xed, 0xa9, 0x90, 0x6f, 0xfb, 0xd1, 0xa1, 0x9d, + 0x6c, 0xb4, 0x7e, 0xaa, 0xc3, 0x6a, 0x9b, 0x45, 0xc3, 0xff, 0x73, 0xcd, 0x77, 0x61, 0x29, 0x3f, + 0x5d, 0x79, 0xb5, 0x82, 0xe0, 0xb3, 0x45, 0x99, 0x93, 0x17, 0x66, 0x23, 0x57, 0x57, 0x78, 0xbc, + 0xdd, 0xec, 0x16, 0x68, 0xeb, 0x1f, 0x1a, 0xac, 0xbc, 0xe3, 0xf2, 0x6b, 0x05, 0x25, 0x53, 0x58, + 0x9f, 0xa9, 0xb0, 0x31, 0x47, 0xe1, 0xd2, 0x85, 0x0a, 0x97, 0xaf, 0xa0, 0xf0, 0x87, 0x1a, 0xdc, + 0x6e, 0x23, 0xef, 0x32, 0x7a, 0x88, 0x9f, 0x1e, 0xad, 0x7f, 0xa1, 0xc1, 0xea, 0xfe, 0x20, 0x3a, + 0x79, 0x71, 0x35, 0xb6, 0x7e, 0xab, 0xc3, 0x4d, 0x95, 0x9b, 0xf6, 0xd2, 0xec, 0xfb, 0x09, 0x05, + 0xe8, 0x1a, 0xd4, 0xb3, 0x84, 0x9f, 0x85, 0xe9, 0x24, 0x2b, 0xd7, 0xb4, 0x34, 0x53, 0xd3, 0xf2, + 0x1c, 0x4d, 0x2b, 0x45, 0xdb, 0x7e, 0x13, 0x9a, 0xf9, 0xab, 0x23, 0x4d, 0xab, 0xde, 0x8d, 0xfb, + 0xd3, 0x4d, 0x9b, 0xc1, 0x21, 0x2d, 0x9b, 0x3f, 0x58, 0xd2, 0xb0, 0x1f, 0xe8, 0xb0, 0x22, 0xb2, + 0xda, 0x4b, 0xcc, 0x2e, 0x8f, 0xd9, 0xdf, 0x35, 0x58, 0x7e, 0xc7, 0xe5, 0xd7, 0x09, 0xd9, 0xf5, + 0x06, 0xff, 0x79, 0x65, 0xcb, 0x1f, 0x5b, 0xd9, 0x7f, 0x6a, 0xd0, 0x4a, 0xf3, 0xdd, 0xa7, 0x43, + 0x63, 0xf1, 0xa4, 0x89, 0x5c, 0xf7, 0xe2, 0x6a, 0x7b, 0xcd, 0xc9, 0xfd, 0x5f, 0x3a, 0x2c, 0x76, + 0x42, 0x8e, 0x2c, 0x7e, 0x6e, 0x9a, 0xbe, 0x71, 0x5e, 0x62, 0xd5, 0x9c, 0x9c, 0x91, 0xe5, 0x52, + 0x2d, 0x8a, 0xc0, 0x8d, 0x63, 0x5f, 0x54, 0xa4, 0x59, 0x7d, 0x93, 0x33, 0x8a, 0x55, 0xbe, 0x4a, + 0x03, 0x13, 0x55, 0xfe, 0x04, 0xaa, 0xd5, 0x22, 0xaa, 0xaf, 0x03, 0x64, 0xe0, 0xf3, 0xd6, 0xc2, + 0x9a, 0x21, 0xca, 0xf4, 0x9c, 0x23, 0x3a, 0x20, 0x16, 0x9d, 0x74, 0xda, 0xbc, 0x55, 0x5b, 0x33, + 0x44, 0x07, 0xa4, 0x28, 0xf2, 0x15, 0x58, 0x60, 0xd1, 0x89, 0xd3, 0x73, 0x63, 0xb7, 0x05, 0xb2, + 0xc8, 0x9e, 0x53, 0x4d, 0x56, 0x59, 0x74, 0xd2, 0x76, 0x63, 0xd7, 0x7a, 0xaa, 0xc3, 0x62, 0x1b, + 0x7d, 0x8c, 0xf1, 0x7f, 0x0f, 0x7a, 0x01, 0xb1, 0xd2, 0x1c, 0xc4, 0xca, 0xf3, 0x10, 0xab, 0x9c, + 0x43, 0xec, 0x1e, 0x34, 0x86, 0x8c, 0x06, 0x2e, 0x1b, 0x3b, 0x1e, 0x8e, 0x45, 0x7b, 0x63, 0xc8, + 0x2c, 0xaf, 0x78, 0x0f, 0x71, 0xcc, 0xad, 0x8f, 0x34, 0x58, 0xdc, 0x47, 0x97, 0x75, 0x07, 0xcf, + 0x0d, 0x86, 0x09, 0xf9, 0x8d, 0xa2, 0xfc, 0xf3, 0x6b, 0xe8, 0xcf, 0x83, 0xc9, 0x90, 0x8f, 0xfc, + 0xd8, 0xc9, 0xc1, 0x51, 0x00, 0x2c, 0x29, 0xfe, 0x4e, 0x06, 0xd1, 0x26, 0x94, 0x8f, 0x47, 0xc8, + 0xc6, 0x17, 0x77, 0x13, 0x6a, 0x9f, 0xf5, 0x57, 0x0d, 0xcc, 0xfd, 0x31, 0xdf, 0x89, 0xc2, 0x23, + 0xda, 0x7f, 0xe1, 0x34, 0x27, 0x50, 0x92, 0xf6, 0x2a, 0xaf, 0x19, 0xeb, 0x35, 0x5b, 0xae, 0x85, + 0x2d, 0x3d, 0x1c, 0x3b, 0x43, 0x86, 0x47, 0xf4, 0x14, 0x95, 0xb5, 0x6b, 0x76, 0xdd, 0xc3, 0xf1, + 0x5e, 0xc2, 0xb2, 0x9e, 0xe9, 0xd0, 0x48, 0x6d, 0x29, 0xf0, 0xb9, 0x8a, 0x42, 0x79, 0x4f, 0xac, + 0x5f, 0x7e, 0xea, 0x32, 0xbd, 0x53, 0x9a, 0x9d, 0x47, 0xef, 0x41, 0x43, 0x9a, 0xc3, 0x09, 0xa3, + 0x1e, 0x66, 0xd6, 0xad, 0x4b, 0xde, 0x23, 0xc9, 0x2a, 0x02, 0x55, 0xb9, 0x8c, 0x8b, 0x54, 0xa7, + 0xbb, 0x08, 0x81, 0xd2, 0x80, 0xc6, 0x2a, 0xaf, 0x34, 0x6c, 0xb9, 0xb6, 0xbe, 0x07, 0xf5, 0x03, + 0x1a, 0xe0, 0x01, 0xed, 0x7a, 0xbb, 0xbc, 0x7f, 0x15, 0xb8, 0xf2, 0xe9, 0x8c, 0x5e, 0x98, 0xce, + 0xcc, 0x7d, 0x61, 0xac, 0xef, 0x6b, 0xb0, 0xf0, 0xb6, 0x3f, 0xe2, 0x83, 0x2b, 0xde, 0x5e, 0xc8, + 0xc7, 0xfa, 0x94, 0x7c, 0x3c, 0x47, 0x86, 0x9f, 0x6b, 0x50, 0x7d, 0x88, 0xe3, 0xad, 0x7d, 0xec, + 0x4b, 0xfb, 0x89, 0x9c, 0x9a, 0x4e, 0x6d, 0x24, 0x41, 0xee, 0x42, 0x7d, 0x22, 0x8b, 0x24, 0xe7, + 0x43, 0x9e, 0x44, 0x2e, 0x78, 0x46, 0x6f, 0xc3, 0x02, 0xe5, 0xce, 0x63, 0xd7, 0xa7, 0x3d, 0x69, + 0xff, 0x05, 0xbb, 0x4a, 0xf9, 0xbb, 0x82, 0x14, 0xf9, 0x2b, 0x13, 0x53, 0x79, 0xbb, 0x61, 0x4f, + 0x70, 0xac, 0xf7, 0x00, 0x12, 0xd1, 0x04, 0x40, 0x99, 0x77, 0x69, 0x93, 0xde, 0xf5, 0x55, 0xa8, + 0x7a, 0x38, 0xde, 0xe2, 0xd8, 0x6f, 0xe9, 0x32, 0xf9, 0xcf, 0x42, 0x2d, 0x39, 0xc9, 0x4e, 0xb7, + 0x5b, 0x3f, 0xd0, 0xa1, 0xf6, 0xad, 0xc8, 0xed, 0x75, 0xc2, 0x1e, 0x9e, 0x3e, 0x57, 0xf8, 0x8f, + 0x28, 0xfa, 0xbd, 0x47, 0x79, 0xfe, 0xcf, 0x19, 0x22, 0x38, 0x24, 0x91, 0x07, 0x47, 0x42, 0x0a, + 0xd8, 0xa9, 0x90, 0xcc, 0x19, 0xba, 0xf1, 0x20, 0xcd, 0x05, 0x20, 0x59, 0x7b, 0x82, 0x43, 0xda, + 0xd0, 0x48, 0x37, 0x30, 0x37, 0x50, 0x19, 0xa1, 0xbe, 0x75, 0x6f, 0x6a, 0xa0, 0x3e, 0xc4, 0xf1, + 0xbb, 0xae, 0x3f, 0xc2, 0x3d, 0x97, 0x32, 0xbb, 0x9e, 0x1c, 0x22, 0xbe, 0xb2, 0x9e, 0x6a, 0x00, + 0x12, 0x01, 0x11, 0xcb, 0xe7, 0x0f, 0xd5, 0x3e, 0xce, 0xa1, 0xe4, 0x4b, 0xb0, 0x12, 0x8e, 0x02, + 0x87, 0xa1, 0xef, 0xc6, 0xd8, 0x73, 0x12, 0x30, 0x78, 0x02, 0x0e, 0x09, 0x47, 0x81, 0xad, 0x7e, + 0xda, 0x4f, 0x7e, 0xb1, 0x7e, 0xa8, 0x01, 0xbc, 0x2d, 0x34, 0x57, 0x62, 0x9c, 0x6d, 0x61, 0xb4, + 0x29, 0x2d, 0xcc, 0x04, 0x74, 0x7a, 0x11, 0xba, 0xed, 0x14, 0x3a, 0x91, 0x97, 0x78, 0x32, 0xda, + 0xbc, 0x37, 0xc3, 0x9c, 0xb9, 0xf2, 0x09, 0xba, 0x72, 0x6d, 0xfd, 0x4c, 0x4d, 0x83, 0x85, 0x74, + 0x4a, 0xa4, 0x82, 0x95, 0xb5, 0xb3, 0x56, 0xbe, 0x0b, 0xf5, 0x00, 0x83, 0x88, 0x8d, 0x1d, 0x4e, + 0x9f, 0x60, 0x1a, 0x24, 0x8a, 0xb5, 0x4f, 0x9f, 0xa0, 0x08, 0x03, 0x09, 0x49, 0x74, 0xc2, 0xd3, + 0xc7, 0x40, 0xc0, 0x10, 0x9d, 0x70, 0xf2, 0x05, 0xb8, 0xc1, 0xb0, 0x8b, 0x61, 0xec, 0x8f, 0x9d, + 0x20, 0xea, 0xd1, 0x23, 0x8a, 0x69, 0xa8, 0x98, 0xe9, 0x0f, 0xbb, 0x09, 0xdf, 0xfa, 0x9b, 0x06, + 0xcd, 0x6f, 0xa7, 0x09, 0x52, 0x49, 0xf6, 0x1c, 0xf2, 0xd6, 0xd7, 0xa4, 0xb2, 0x05, 0xfc, 0xe6, + 0x8c, 0x86, 0x33, 0x90, 0xec, 0x05, 0x8e, 0x7d, 0x25, 0xd4, 0x36, 0xd4, 0xa5, 0x39, 0x92, 0x33, + 0x4a, 0x73, 0x6d, 0x90, 0x5b, 0xde, 0x86, 0xa3, 0x6c, 0x6d, 0xfd, 0x52, 0x07, 0xa2, 0x46, 0x0a, + 0xd2, 0x48, 0x2f, 0x5c, 0x1f, 0xf0, 0xc6, 0xf4, 0x3e, 0xe0, 0x7c, 0x81, 0xf7, 0x19, 0x50, 0x6a, + 0xe5, 0xf3, 0xbe, 0x42, 0x12, 0x68, 0x43, 0x03, 0x4f, 0x63, 0xe6, 0xa6, 0x41, 0x57, 0xbd, 0x74, + 0xd0, 0xc9, 0xcf, 0x92, 0x48, 0x7e, 0x5f, 0x87, 0x95, 0xb4, 0x53, 0x7c, 0x89, 0xd7, 0xc5, 0x78, + 0xfd, 0x4e, 0x87, 0xd7, 0x0a, 0x78, 0xed, 0xb1, 0xa8, 0xcf, 0x90, 0xf3, 0x97, 0xb8, 0xcd, 0xc3, + 0xed, 0xcd, 0xbf, 0x18, 0x50, 0x4d, 0x14, 0x26, 0x35, 0x28, 0x7b, 0x8f, 0xa2, 0x10, 0xcd, 0x57, + 0xc8, 0x2a, 0xdc, 0xf0, 0xce, 0xfe, 0x37, 0x61, 0xf6, 0xc8, 0x32, 0x2c, 0x79, 0xc5, 0xb1, 0xbd, + 0x89, 0x84, 0x40, 0xd3, 0x2b, 0x4c, 0xad, 0xcd, 0x23, 0x72, 0x0b, 0x96, 0xbd, 0xf3, 0x83, 0x5d, + 0x53, 0xbc, 0xfb, 0xa6, 0x57, 0x9c, 0x7d, 0x72, 0x73, 0x20, 0x8f, 0xf8, 0x06, 0xc6, 0x59, 0x11, + 0xcf, 0x4d, 0x4a, 0x56, 0xc1, 0xf4, 0xce, 0x8c, 0x20, 0xcd, 0xdf, 0x6b, 0x64, 0x19, 0x9a, 0x5e, + 0x61, 0xc6, 0x66, 0xfe, 0x41, 0x23, 0x04, 0x16, 0xbd, 0xc9, 0x21, 0x92, 0xf9, 0x47, 0x8d, 0xdc, + 0x02, 0xe2, 0x9d, 0x9b, 0xb5, 0x98, 0x7f, 0xd2, 0xc8, 0x0a, 0x2c, 0x79, 0x85, 0x91, 0x04, 0x37, + 0xff, 0xac, 0x91, 0x1b, 0xd0, 0xf0, 0x26, 0xd2, 0x93, 0xf9, 0x2b, 0x5d, 0x5d, 0x35, 0xe9, 0x53, + 0xe6, 0xfb, 0x3a, 0xb9, 0x03, 0x37, 0xbd, 0xa9, 0x8e, 0x66, 0x3e, 0xd3, 0x49, 0x03, 0xaa, 0x9e, + 0x6a, 0xfe, 0xcd, 0x1f, 0x19, 0x92, 0x52, 0x5d, 0xa9, 0xf9, 0x63, 0x83, 0xd4, 0xa1, 0xe2, 0xc9, + 0xfa, 0xd0, 0xfc, 0x89, 0xfa, 0x49, 0x55, 0xf7, 0xe6, 0x47, 0x86, 0x14, 0x7f, 0xb2, 0xd6, 0x37, + 0xff, 0x6d, 0x90, 0x26, 0xd4, 0xbc, 0xb4, 0x9e, 0x35, 0x7f, 0x5d, 0x93, 0x52, 0x17, 0x9f, 0x0a, + 0xf3, 0x83, 0x1a, 0x59, 0x02, 0xf0, 0xb2, 0xb2, 0xc7, 0xfc, 0x4d, 0xed, 0xcd, 0xb7, 0x60, 0x21, + 0xfd, 0x7b, 0x90, 0x00, 0x54, 0x76, 0x5d, 0x1e, 0x23, 0x33, 0x5f, 0x11, 0x6b, 0x1b, 0xdd, 0x1e, + 0x32, 0x53, 0x13, 0xeb, 0xef, 0x30, 0x2a, 0xf8, 0xba, 0xb0, 0xf9, 0x9e, 0x70, 0x4c, 0xd3, 0xd8, + 0x6e, 0x7f, 0x77, 0xbb, 0x4f, 0xe3, 0xc1, 0xe8, 0x50, 0x78, 0xcd, 0xe6, 0x13, 0xea, 0xfb, 0xf4, + 0x49, 0x8c, 0xdd, 0xc1, 0xa6, 0xf2, 0xa8, 0x2f, 0xf6, 0x28, 0x8f, 0x19, 0x3d, 0x1c, 0xc5, 0xd8, + 0xdb, 0x4c, 0x83, 0x65, 0x53, 0xba, 0x59, 0x46, 0x0e, 0x0f, 0x0f, 0x2b, 0x92, 0xf3, 0xe0, 0xbf, + 0x01, 0x00, 0x00, 0xff, 0xff, 0xb2, 0xdd, 0xfb, 0x16, 0x8f, 0x1f, 0x00, 0x00, } diff --git a/internal/querynode/client/client.go b/internal/querynode/client/client.go index 19ffdae3c0..5cbfc2a08e 100644 --- a/internal/querynode/client/client.go +++ b/internal/querynode/client/client.go @@ -21,7 +21,8 @@ func NewLoadIndexClient(ctx context.Context, pulsarAddress string, loadIndexChan } } -func (lic *LoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64) error { +func (lic *LoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, indexParam map[string]string) error { + // TODO:: add indexParam to proto baseMsg := msgstream.BaseMsg{ BeginTimestamp: 0, EndTimestamp: 0, diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 688841c31b..430fd55dd7 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -54,7 +54,7 @@ type collectionReplica interface { // segment getSegmentNum() int - getSegmentStatistics() *internalpb.QueryNodeStats + getSegmentStatistics() []*internalpb.SegmentStats addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error removeSegment(segmentID UniqueID) error getSegmentByID(segmentID UniqueID) (*Segment, error) @@ -317,7 +317,7 @@ func (colReplica *collectionReplicaImpl) getSegmentNum() int { return len(colReplica.segments) } -func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeStats { +func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb.SegmentStats { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -339,10 +339,7 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.Quer segment.recentlyModified = false } - return &internalpb.QueryNodeStats{ - MsgType: internalpb.MsgType_kQueryNodeStats, - SegStats: statisticData, - } + return statisticData } func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error { @@ -359,7 +356,7 @@ func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitio colReplica.mu.Lock() defer colReplica.mu.Unlock() - var newSegment = newSegment(collection, segmentID) + var newSegment = newSegment(collection, segmentID, partitionTag, collectionID) colReplica.segments[segmentID] = newSegment *partition.Segments() = append(*partition.Segments(), newSegment) diff --git a/internal/querynode/load_index.go b/internal/querynode/load_index.go deleted file mode 100644 index d49dad234e..0000000000 --- a/internal/querynode/load_index.go +++ /dev/null @@ -1,41 +0,0 @@ -package querynode - -import ( - "context" - - "github.com/minio/minio-go/v7" - "github.com/zilliztech/milvus-distributed/internal/msgstream" -) - -type LoadIndex struct { - ctx context.Context - cancel context.CancelFunc - client *minio.Client - - replica collectionReplica - numCompletedSegmentsToFieldID map[int64]int64 - - msgBuffer chan msgstream.TsMsg - unsolvedMsg []msgstream.TsMsg - loadIndexMsgStream msgstream.MsgStream - - queryNodeID UniqueID -} - -func (li *LoadIndex) loadIndex(indexKey []string) [][]byte { - // TODO:: load dataStore client interface to load builtIndex according index key - - return nil -} - -func (li *LoadIndex) updateSegmentIndex(bytesIndex [][]byte, segmentID UniqueID) error { - // TODO:: dataStore return bytes index, load index to c++ segment - // TODO: how to deserialize bytes to segment index? - - return nil -} - -func (li *LoadIndex) sendQueryNodeStats() error { - // TODO:: update segment index type in replica, and publish queryNode segmentStats - return nil -} diff --git a/internal/querynode/load_index_info.go b/internal/querynode/load_index_info.go new file mode 100644 index 0000000000..362b687b76 --- /dev/null +++ b/internal/querynode/load_index_info.go @@ -0,0 +1,98 @@ +package querynode + +/* +#cgo CFLAGS: -I${SRCDIR}/../core/output/include +#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib + +#include "segcore/load_index_c.h" + +*/ +import "C" +import ( + "errors" + "strconv" + "unsafe" +) + +type LoadIndexInfo struct { + cLoadIndexInfo C.CLoadIndexInfo +} + +func NewLoadIndexInfo() (*LoadIndexInfo, error) { + var cLoadIndexInfo C.CLoadIndexInfo + status := C.NewLoadIndexInfo(&cLoadIndexInfo) + errorCode := status.error_code + + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return nil, errors.New("NewLoadIndexInfo failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) + } + return &LoadIndexInfo{cLoadIndexInfo: cLoadIndexInfo}, nil +} + +func (li *LoadIndexInfo) AppendIndexParam(indexKey string, indexValue string) error { + cIndexKey := C.CString(indexKey) + cIndexValue := C.CString(indexValue) + status := C.AppendIndexParam(li.cLoadIndexInfo, cIndexKey, cIndexValue) + errorCode := status.error_code + + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return errors.New("AppendIndexParam failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) + } + return nil +} + +func (li *LoadIndexInfo) AppendFieldInfo(fieldName string, fieldID int64) error { + cFieldName := C.CString(fieldName) + cFieldID := C.long(fieldID) + status := C.AppendFieldInfo(li.cLoadIndexInfo, cFieldName, cFieldID) + errorCode := status.error_code + + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return errors.New("AppendFieldInfo failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) + } + return nil +} + +func (li *LoadIndexInfo) AppendIndex(bytesIndex [][]byte, indexKeys []string) error { + var cBinarySet C.CBinarySet + status := C.NewBinarySet(&cBinarySet) + + errorCode := status.error_code + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return errors.New("newBinarySet failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) + } + + for i, byteIndex := range bytesIndex { + indexPtr := unsafe.Pointer(&byteIndex[0]) + indexLen := C.long(len(byteIndex)) + indexKey := C.CString(indexKeys[i]) + status = C.AppendBinaryIndex(cBinarySet, indexPtr, indexLen, indexKey) + errorCode = status.error_code + if errorCode != 0 { + break + } + } + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return errors.New("AppendBinaryIndex failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) + } + + status = C.AppendIndex(li.cLoadIndexInfo, cBinarySet) + errorCode = status.error_code + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return errors.New("AppendIndex failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) + } + + return nil +} diff --git a/internal/querynode/load_index_service.go b/internal/querynode/load_index_service.go new file mode 100644 index 0000000000..a2eaac7bfe --- /dev/null +++ b/internal/querynode/load_index_service.go @@ -0,0 +1,286 @@ +package querynode + +import ( + "context" + "errors" + "fmt" + "log" + "path/filepath" + "sort" + "strconv" + "strings" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + + minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +type loadIndexService struct { + ctx context.Context + cancel context.CancelFunc + client *minioKV.MinIOKV + + replica collectionReplica + + fieldIndexes map[string][]*internalPb.IndexStats + fieldStatsChan chan []*internalPb.FieldStats + + msgBuffer chan msgstream.TsMsg + unsolvedMsg []msgstream.TsMsg + loadIndexMsgStream msgstream.MsgStream + + queryNodeID UniqueID +} + +func newLoadIndexService(ctx context.Context, replica collectionReplica) *loadIndexService { + ctx1, cancel := context.WithCancel(ctx) + + // init minio + minioClient, err := minio.New(Params.MinioEndPoint, &minio.Options{ + Creds: credentials.NewStaticV4(Params.MinioAccessKeyID, Params.MinioSecretAccessKey, ""), + Secure: Params.MinioUseSSLStr, + }) + if err != nil { + panic(err) + } + + // TODO: load bucketName from config + bucketName := "query-node-load-index-service-minio" + MinioKV, err := minioKV.NewMinIOKV(ctx1, minioClient, bucketName) + if err != nil { + panic(err) + } + + // init msgStream + receiveBufSize := Params.LoadIndexReceiveBufSize + pulsarBufSize := Params.LoadIndexPulsarBufSize + + msgStreamURL := Params.PulsarAddress + + consumeChannels := Params.LoadIndexChannelNames + consumeSubName := Params.MsgChannelSubName + + loadIndexStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) + loadIndexStream.SetPulsarClient(msgStreamURL) + unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() + loadIndexStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + + var stream msgstream.MsgStream = loadIndexStream + + return &loadIndexService{ + ctx: ctx1, + cancel: cancel, + client: MinioKV, + + replica: replica, + fieldIndexes: make(map[string][]*internalPb.IndexStats), + fieldStatsChan: make(chan []*internalPb.FieldStats, 1), + + msgBuffer: make(chan msgstream.TsMsg, 1), + unsolvedMsg: make([]msgstream.TsMsg, 0), + loadIndexMsgStream: stream, + + queryNodeID: Params.QueryNodeID, + } +} + +func (lis *loadIndexService) start() { + lis.loadIndexMsgStream.Start() + + for { + select { + case <-lis.ctx.Done(): + return + default: + messages := lis.loadIndexMsgStream.Consume() + if messages == nil || len(messages.Msgs) <= 0 { + log.Println("null msg pack") + continue + } + for _, msg := range messages.Msgs { + indexMsg, ok := msg.(*msgstream.LoadIndexMsg) + if !ok { + log.Println("type assertion failed for LoadIndexMsg") + continue + } + /* TODO: debug + // 1. use msg's index paths to get index bytes + indexBuffer := lis.loadIndex(indexMsg.IndexPaths) + // 2. use index bytes and index path to update segment + err := lis.updateSegmentIndex(indexBuffer, indexMsg.IndexPaths, indexMsg.SegmentID) + if err != nil { + log.Println(err) + continue + } + */ + // 3. update segment index stats + err := lis.updateSegmentIndexStats(indexMsg) + if err != nil { + log.Println(err) + continue + } + } + + // sendQueryNodeStats + err := lis.sendQueryNodeStats() + if err != nil { + log.Println(err) + continue + } + } + } +} + +func (lis *loadIndexService) printIndexParams(index []*commonpb.KeyValuePair) { + fmt.Println("=================================================") + for i := 0; i < len(index); i++ { + fmt.Println(index[i]) + } +} + +func (lis *loadIndexService) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool { + if len(index1) != len(index2) { + return false + } + + for i := 0; i < len(index1); i++ { + kv1 := *index1[i] + kv2 := *index2[i] + if kv1.Key != kv2.Key || kv1.Value != kv2.Value { + return false + } + } + + return true +} + +func (lis *loadIndexService) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string { + return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10) +} + +func (lis *loadIndexService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) { + ids := strings.Split(key, "/") + if len(ids) != 2 { + return 0, 0, errors.New("illegal fieldsStatsKey") + } + collectionID, err := strconv.ParseInt(ids[0], 10, 64) + if err != nil { + return 0, 0, err + } + fieldID, err := strconv.ParseInt(ids[1], 10, 64) + if err != nil { + return 0, 0, err + } + return collectionID, fieldID, nil +} + +func (lis *loadIndexService) updateSegmentIndexStats(indexMsg *msgstream.LoadIndexMsg) error { + targetSegment, err := lis.replica.getSegmentByID(indexMsg.SegmentID) + if err != nil { + return err + } + + fieldStatsKey := lis.fieldsStatsIDs2Key(targetSegment.collectionID, indexMsg.FieldID) + _, ok := lis.fieldIndexes[fieldStatsKey] + newIndexParams := indexMsg.IndexParams + // sort index params by key + sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key }) + if !ok { + lis.fieldIndexes[fieldStatsKey] = make([]*internalPb.IndexStats, 0) + lis.fieldIndexes[fieldStatsKey] = append(lis.fieldIndexes[fieldStatsKey], + &internalPb.IndexStats{ + IndexParams: newIndexParams, + NumRelatedSegments: 1, + }) + } else { + isNewIndex := true + for _, index := range lis.fieldIndexes[fieldStatsKey] { + if lis.indexParamsEqual(newIndexParams, index.IndexParams) { + index.NumRelatedSegments++ + isNewIndex = false + } + } + if isNewIndex { + lis.fieldIndexes[fieldStatsKey] = append(lis.fieldIndexes[fieldStatsKey], + &internalPb.IndexStats{ + IndexParams: newIndexParams, + NumRelatedSegments: 1, + }) + } + } + + return nil +} + +func (lis *loadIndexService) loadIndex(indexPath []string) [][]byte { + index := make([][]byte, 0) + + for _, path := range indexPath { + // get binarySetKey from indexPath + binarySetKey := filepath.Base(path) + indexPiece, err := (*lis.client).Load(binarySetKey) + if err != nil { + log.Println(err) + return nil + } + index = append(index, []byte(indexPiece)) + } + + return index +} + +func (lis *loadIndexService) updateSegmentIndex(bytesIndex [][]byte, loadIndexMsg *msgstream.LoadIndexMsg) error { + segment, err := lis.replica.getSegmentByID(loadIndexMsg.SegmentID) + if err != nil { + return err + } + + loadIndexInfo, err := NewLoadIndexInfo() + if err != nil { + return err + } + err = loadIndexInfo.AppendFieldInfo(loadIndexMsg.FieldName, loadIndexMsg.FieldID) + if err != nil { + return err + } + for _, indexParam := range loadIndexMsg.IndexParams { + err = loadIndexInfo.AppendIndexParam(indexParam.Key, indexParam.Value) + if err != nil { + return err + } + } + err = loadIndexInfo.AppendIndex(bytesIndex, loadIndexMsg.IndexPaths) + if err != nil { + return err + } + err = segment.updateSegmentIndex(loadIndexInfo) + if err != nil { + return err + } + + return nil +} + +func (lis *loadIndexService) sendQueryNodeStats() error { + resultFieldsStats := make([]*internalPb.FieldStats, 0) + for fieldStatsKey, indexStats := range lis.fieldIndexes { + colID, fieldID, err := lis.fieldsStatsKey2IDs(fieldStatsKey) + if err != nil { + return err + } + fieldStats := internalPb.FieldStats{ + CollectionID: colID, + FieldID: fieldID, + IndexStats: indexStats, + } + resultFieldsStats = append(resultFieldsStats, &fieldStats) + } + + lis.fieldStatsChan <- resultFieldsStats + fmt.Println("sent field stats") + return nil +} diff --git a/internal/querynode/load_index_service_test.go b/internal/querynode/load_index_service_test.go new file mode 100644 index 0000000000..49d2156703 --- /dev/null +++ b/internal/querynode/load_index_service_test.go @@ -0,0 +1,148 @@ +package querynode + +import ( + "math" + "math/rand" + "sort" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +func TestLoadIndexService_PulsarAddress(t *testing.T) { + node := newQueryNode() + collectionID := rand.Int63n(1000000) + segmentID := rand.Int63n(1000000) + fieldID := rand.Int63n(1000000) + initTestMeta(t, node, "collection0", collectionID, segmentID) + + // loadIndexService and statsService + node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica) + go node.loadIndexService.start() + node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan) + go node.statsService.start() + + // gen load index message pack + const msgLength = 10 + indexParams := make([]*commonpb.KeyValuePair, 0) + // init IVF_FLAT index params + const ( + KeyDim = "dim" + KeyTopK = "k" + KeyNList = "nlist" + KeyNProbe = "nprobe" + KeyMetricType = "metric_type" + KeySliceSize = "SLICE_SIZE" + KeyDeviceID = "gpu_id" + ) + const ( + ValueDim = "128" + ValueTopK = "10" + ValueNList = "100" + ValueNProbe = "4" + ValueMetricType = "L2" + ValueSliceSize = "4" + ValueDeviceID = "0" + ) + + indexParams = append(indexParams, &commonpb.KeyValuePair{ + Key: KeyDim, + Value: ValueDim, + }) + indexParams = append(indexParams, &commonpb.KeyValuePair{ + Key: KeyTopK, + Value: ValueTopK, + }) + indexParams = append(indexParams, &commonpb.KeyValuePair{ + Key: KeyNList, + Value: ValueNList, + }) + indexParams = append(indexParams, &commonpb.KeyValuePair{ + Key: KeyNProbe, + Value: ValueNProbe, + }) + indexParams = append(indexParams, &commonpb.KeyValuePair{ + Key: KeyMetricType, + Value: ValueMetricType, + }) + indexParams = append(indexParams, &commonpb.KeyValuePair{ + Key: KeySliceSize, + Value: ValueSliceSize, + }) + indexParams = append(indexParams, &commonpb.KeyValuePair{ + Key: KeyDeviceID, + Value: ValueDeviceID, + }) + + loadIndex := internalPb.LoadIndex{ + MsgType: internalPb.MsgType_kLoadIndex, + SegmentID: segmentID, + FieldID: fieldID, + IndexPaths: []string{"tmp/index"}, // TODO: + IndexParams: indexParams, + } + + loadIndexMsg := msgstream.LoadIndexMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{uint32(0)}, + }, + LoadIndex: loadIndex, + } + + messages := make([]msgstream.TsMsg, 0) + for i := 0; i < msgLength; i++ { + var msg msgstream.TsMsg = &loadIndexMsg + messages = append(messages, msg) + } + + msgPack := msgstream.MsgPack{ + BeginTs: 0, + EndTs: math.MaxUint64, + Msgs: messages, + } + + // init message stream producer + loadIndexChannelNames := Params.LoadIndexChannelNames + pulsarURL := Params.PulsarAddress + + loadIndexStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.LoadIndexReceiveBufSize) + loadIndexStream.SetPulsarClient(pulsarURL) + loadIndexStream.CreatePulsarProducers(loadIndexChannelNames) + + var loadIndexMsgStream msgstream.MsgStream = loadIndexStream + loadIndexMsgStream.Start() + + err := loadIndexMsgStream.Produce(&msgPack) + assert.NoError(t, err) + + // init message stream consumer and do checks + statsMs := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize) + statsMs.SetPulsarClient(pulsarURL) + statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, msgstream.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize) + statsMs.Start() + + receiveMsg := msgstream.MsgStream(statsMs).Consume() + assert.NotNil(t, receiveMsg) + assert.NotEqual(t, len(receiveMsg.Msgs), 0) + statsMsg, ok := receiveMsg.Msgs[0].(*msgstream.QueryNodeStatsMsg) + assert.Equal(t, ok, true) + assert.Equal(t, len(statsMsg.FieldStats), 1) + fieldStats0 := statsMsg.FieldStats[0] + assert.Equal(t, fieldStats0.FieldID, fieldID) + assert.Equal(t, fieldStats0.CollectionID, collectionID) + assert.Equal(t, len(fieldStats0.IndexStats), 1) + indexStats0 := fieldStats0.IndexStats[0] + + params := indexStats0.IndexParams + // sort index params by key + sort.Slice(indexParams, func(i, j int) bool { return indexParams[i].Key < indexParams[j].Key }) + indexEqual := node.loadIndexService.indexParamsEqual(params, indexParams) + assert.Equal(t, indexEqual, true) + + <-node.queryNodeLoopCtx.Done() + node.Close() +} diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index d8c825e761..421cfed56a 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -21,6 +21,12 @@ type ParamTable struct { FlowGraphMaxQueueLength int32 FlowGraphMaxParallelism int32 + // minio + MinioEndPoint string + MinioAccessKeyID string + MinioSecretAccessKey string + MinioUseSSLStr bool + // dm InsertChannelNames []string InsertChannelRange []int @@ -44,6 +50,11 @@ type ParamTable struct { StatsChannelName string StatsReceiveBufSize int64 + // load index + LoadIndexChannelNames []string + LoadIndexReceiveBufSize int64 + LoadIndexPulsarBufSize int64 + GracefulTime int64 MsgChannelSubName string DefaultPartitionTag string @@ -59,6 +70,11 @@ func (p *ParamTable) Init() { panic(err) } + err = p.LoadYaml("milvus.yaml") + if err != nil { + panic(err) + } + queryNodeIDStr := os.Getenv("QUERY_NODE_ID") if queryNodeIDStr == "" { queryNodeIDList := p.QueryNodeIDList() @@ -78,6 +94,11 @@ func (p *ParamTable) Init() { panic(err) } + p.initMinioEndPoint() + p.initMinioAccessKeyID() + p.initMinioSecretAccessKey() + p.initMinioUseSSLStr() + p.initPulsarAddress() p.initETCDAddress() p.initMetaRootPath() @@ -111,6 +132,46 @@ func (p *ParamTable) Init() { p.initStatsPublishInterval() p.initStatsChannelName() p.initStatsReceiveBufSize() + + p.initLoadIndexChannelNames() + p.initLoadIndexReceiveBufSize() + p.initLoadIndexPulsarBufSize() +} + +func (p *ParamTable) initMinioEndPoint() { + url, err := p.Load("_MinioAddress") + if err != nil { + panic(err) + } + p.MinioEndPoint = url +} + +func (p *ParamTable) initMinioAccessKeyID() { + id, err := p.Load("minio.accessKeyID") + if err != nil { + panic(err) + } + p.MinioAccessKeyID = id +} + +func (p *ParamTable) initMinioSecretAccessKey() { + key, err := p.Load("minio.secretAccessKey") + if err != nil { + panic(err) + } + p.MinioSecretAccessKey = key +} + +func (p *ParamTable) initMinioUseSSLStr() { + ssl, err := p.Load("minio.useSSL") + if err != nil { + panic(err) + } + sslBoolean, err := strconv.ParseBool(ssl) + if err != nil { + panic(err) + } + p.MinioUseSSLStr = sslBoolean } func (p *ParamTable) initPulsarAddress() { @@ -358,3 +419,19 @@ func (p *ParamTable) initSliceIndex() { func (p *ParamTable) initQueryNodeNum() { p.QueryNodeNum = len(p.QueryNodeIDList()) } + +func (p *ParamTable) initLoadIndexChannelNames() { + loadIndexChannelName, err := p.Load("msgChannel.chanNamePrefix.cmd") + if err != nil { + panic(err) + } + p.LoadIndexChannelNames = []string{loadIndexChannelName} +} + +func (p *ParamTable) initLoadIndexReceiveBufSize() { + p.LoadIndexReceiveBufSize = p.ParseInt64("queryNode.msgStream.loadIndex.recvBufSize") +} + +func (p *ParamTable) initLoadIndexPulsarBufSize() { + p.LoadIndexPulsarBufSize = p.ParseInt64("queryNode.msgStream.loadIndex.pulsarBufSize") +} diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index 8d77bcd046..461073146f 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -15,6 +15,47 @@ func TestParamTable_PulsarAddress(t *testing.T) { assert.Equal(t, "6650", split[len(split)-1]) } +func TestParamTable_minio(t *testing.T) { + t.Run("Test endPoint", func(t *testing.T) { + endPoint := Params.MinioEndPoint + equal := endPoint == "localhost:9000" || endPoint == "minio:9000" + assert.Equal(t, equal, true) + }) + + t.Run("Test accessKeyID", func(t *testing.T) { + accessKeyID := Params.MinioAccessKeyID + assert.Equal(t, accessKeyID, "minioadmin") + }) + + t.Run("Test secretAccessKey", func(t *testing.T) { + secretAccessKey := Params.MinioSecretAccessKey + assert.Equal(t, secretAccessKey, "minioadmin") + }) + + t.Run("Test useSSL", func(t *testing.T) { + useSSL := Params.MinioUseSSLStr + assert.Equal(t, useSSL, false) + }) +} + +func TestParamTable_LoadIndex(t *testing.T) { + t.Run("Test channel names", func(t *testing.T) { + names := Params.LoadIndexChannelNames + assert.Equal(t, len(names), 1) + assert.Contains(t, names[0], "cmd") + }) + + t.Run("Test recvBufSize", func(t *testing.T) { + size := Params.LoadIndexReceiveBufSize + assert.Equal(t, size, int64(512)) + }) + + t.Run("Test pulsarBufSize", func(t *testing.T) { + size := Params.LoadIndexPulsarBufSize + assert.Equal(t, size, int64(512)) + }) +} + func TestParamTable_QueryNodeID(t *testing.T) { id := Params.QueryNodeID assert.Contains(t, Params.QueryNodeIDList(), id) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index ed7101824a..819d2b8554 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -24,10 +24,12 @@ type QueryNode struct { replica collectionReplica - dataSyncService *dataSyncService - metaService *metaService - searchService *searchService - statsService *statsService + // services + dataSyncService *dataSyncService + metaService *metaService + searchService *searchService + loadIndexService *loadIndexService + statsService *statsService } func Init() { @@ -69,11 +71,13 @@ func (node *QueryNode) Start() error { node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica) node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica) + node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica) + node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan) go node.dataSyncService.start() go node.searchService.start() go node.metaService.start() + go node.loadIndexService.start() go node.statsService.start() <-node.queryNodeLoopCtx.Done() diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 3bdce39e27..098f2cae2d 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -26,6 +26,7 @@ func setup() { func genTestCollectionMeta(collectionName string, collectionID UniqueID) *etcdpb.CollectionMeta { fieldVec := schemapb.FieldSchema{ + FieldID: UniqueID(0), Name: "vec", IsPrimaryKey: false, DataType: schemapb.DataType_VECTOR_FLOAT, @@ -44,6 +45,7 @@ func genTestCollectionMeta(collectionName string, collectionID UniqueID) *etcdpb } fieldInt := schemapb.FieldSchema{ + FieldID: UniqueID(1), Name: "age", IsPrimaryKey: false, DataType: schemapb.DataType_INT32, @@ -119,12 +121,13 @@ func makeNewChannelNames(names []string, suffix string) []string { } func refreshChannelNames() { - suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(100), 10) + suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(1000000), 10) Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix) Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix) Params.SearchChannelNames = makeNewChannelNames(Params.SearchChannelNames, suffix) Params.SearchResultChannelNames = makeNewChannelNames(Params.SearchResultChannelNames, suffix) Params.StatsChannelName = Params.StatsChannelName + suffix + Params.LoadIndexChannelNames = makeNewChannelNames(Params.LoadIndexChannelNames, suffix) } func TestMain(m *testing.M) { diff --git a/internal/querynode/reduce_test.go b/internal/querynode/reduce_test.go index 7cd03b2b7f..afbc87bcfc 100644 --- a/internal/querynode/reduce_test.go +++ b/internal/querynode/reduce_test.go @@ -21,7 +21,7 @@ func TestReduce_AllFunc(t *testing.T) { assert.NotEqual(t, "", schemaBlob) collection := newCollection(collectionMeta.ID, schemaBlob) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) const DIM = 16 var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index e3f7d81252..0379092865 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -9,7 +9,6 @@ package querynode #include "segcore/collection_c.h" #include "segcore/plan_c.h" #include "segcore/reduce_c.h" - */ import "C" import ( @@ -25,6 +24,8 @@ import ( type Segment struct { segmentPtr C.CSegmentBase segmentID UniqueID + partitionTag string // TODO: use partitionID + collectionID UniqueID lastMemSize int64 lastRowCount int64 recentlyModified bool @@ -35,13 +36,18 @@ func (s *Segment) ID() UniqueID { } //-------------------------------------------------------------------------------------- constructor and destructor -func newSegment(collection *Collection, segmentID int64) *Segment { +func newSegment(collection *Collection, segmentID int64, partitionTag string, collectionID UniqueID) *Segment { /* CSegmentBase newSegment(CPartition partition, unsigned long segment_id); */ segmentPtr := C.NewSegment(collection.collectionPtr, C.ulong(segmentID)) - var newSegment = &Segment{segmentPtr: segmentPtr, segmentID: segmentID} + var newSegment = &Segment{ + segmentPtr: segmentPtr, + segmentID: segmentID, + partitionTag: partitionTag, + collectionID: collectionID, + } return newSegment } @@ -236,3 +242,17 @@ func (s *Segment) fillTargetEntry(plan *Plan, return nil } + +// segment, err := loadIndexService.replica.getSegmentByID(segmentID) +func (s *Segment) updateSegmentIndex(loadIndexInfo *LoadIndexInfo) error { + status := C.UpdateSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo) + errorCode := status.error_code + + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return errors.New("updateSegmentIndex failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) + } + + return nil +} diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index f24d060d66..23cff02718 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -26,7 +26,7 @@ func TestSegment_newSegment(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) assert.Equal(t, segmentID, segment.segmentID) deleteSegment(segment) deleteCollection(collection) @@ -44,7 +44,7 @@ func TestSegment_deleteSegment(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) assert.Equal(t, segmentID, segment.segmentID) deleteSegment(segment) @@ -64,7 +64,7 @@ func TestSegment_getRowCount(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -115,7 +115,7 @@ func TestSegment_getDeletedCount(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -172,7 +172,7 @@ func TestSegment_getMemSize(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -223,7 +223,7 @@ func TestSegment_segmentInsert(t *testing.T) { assert.Equal(t, collection.Name(), collectionName) assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -270,7 +270,7 @@ func TestSegment_segmentDelete(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -323,7 +323,7 @@ func TestSegment_segmentSearch(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -408,7 +408,7 @@ func TestSegment_segmentPreInsert(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) assert.Equal(t, segmentID, segment.segmentID) const DIM = 16 @@ -450,7 +450,7 @@ func TestSegment_segmentPreDelete(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment(collection, segmentID) + segment := newSegment(collection, segmentID, Params.DefaultPartitionTag, collectionID) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go index b46ccdbc2b..17c8bd9473 100644 --- a/internal/querynode/stats_service.go +++ b/internal/querynode/stats_service.go @@ -12,17 +12,23 @@ import ( ) type statsService struct { - ctx context.Context - statsStream msgstream.MsgStream - replica collectionReplica + ctx context.Context + + replica collectionReplica + + fieldStatsChan chan []*internalpb.FieldStats + statsStream msgstream.MsgStream } -func newStatsService(ctx context.Context, replica collectionReplica) *statsService { +func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsChan chan []*internalpb.FieldStats) *statsService { return &statsService{ - ctx: ctx, - statsStream: nil, - replica: replica, + ctx: ctx, + + replica: replica, + + fieldStatsChan: fieldStatsChan, + statsStream: nil, } } @@ -50,7 +56,9 @@ func (sService *statsService) start() { case <-sService.ctx.Done(): return case <-time.After(time.Duration(sleepTimeInterval) * time.Millisecond): - sService.sendSegmentStatistic() + sService.publicStatistic(nil) + case fieldStats := <-sService.fieldStatsChan: + sService.publicStatistic(fieldStats) } } } @@ -61,20 +69,21 @@ func (sService *statsService) close() { } } -func (sService *statsService) sendSegmentStatistic() { - statisticData := sService.replica.getSegmentStatistics() +func (sService *statsService) publicStatistic(fieldStats []*internalpb.FieldStats) { + segStats := sService.replica.getSegmentStatistics() - // fmt.Println("Publish segment statistic") - // fmt.Println(statisticData) - sService.publicStatistic(statisticData) -} + queryNodeStats := internalpb.QueryNodeStats{ + MsgType: internalpb.MsgType_kQueryNodeStats, + PeerID: Params.QueryNodeID, + SegStats: segStats, + FieldStats: fieldStats, + } -func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeStats) { var msg msgstream.TsMsg = &msgstream.QueryNodeStatsMsg{ BaseMsg: msgstream.BaseMsg{ HashValues: []uint32{0}, }, - QueryNodeStats: *statistic, + QueryNodeStats: queryNodeStats, } var msgPack = msgstream.MsgPack{ diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go index 8097931bdc..c7ff4b3f94 100644 --- a/internal/querynode/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -10,7 +10,7 @@ import ( func TestStatsService_start(t *testing.T) { node := newQueryNode() initTestMeta(t, node, "collection0", 0, 0) - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica) + node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil) node.statsService.start() node.Close() } @@ -32,11 +32,11 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) { var statsMsgStream msgstream.MsgStream = statsStream - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica) + node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil) node.statsService.statsStream = statsMsgStream node.statsService.statsStream.Start() // send stats - node.statsService.sendSegmentStatistic() + node.statsService.publicStatistic(nil) node.Close() }