From f4c643f1bda4b9aa5fe1d38bb6f6364aee64ee82 Mon Sep 17 00:00:00 2001 From: sunby Date: Wed, 25 Nov 2020 18:39:05 +0800 Subject: [PATCH] Add number limitation of partition and field. And Change default partition tag to "_default" Signed-off-by: sunby --- Makefile | 12 ++-- .../reader.go => querynode/query_node.go} | 6 +- configs/advanced/master.yaml | 5 +- configs/advanced/proxy.yaml | 1 + configs/advanced/query_node.yaml | 6 +- internal/core/src/segcore/segment_c.cpp | 49 ++++++++++--- internal/core/src/segcore/segment_c.h | 22 ++++-- internal/core/unittest/test_c_api.cpp | 18 ++--- internal/master/meta_table.go | 18 ++++- internal/master/meta_table_test.go | 41 +++++++++++ internal/master/param_table.go | 26 +++++++ internal/master/partition_task.go | 8 ++- internal/master/partition_task_test.go | 7 +- internal/master/segment_manager_test.go | 3 + internal/msgstream/msgstream.go | 2 +- internal/proxy/paramtable.go | 12 ++++ internal/proxy/task.go | 5 ++ internal/proxy/validate_util.go | 2 +- internal/{reader => querynode}/collection.go | 2 +- .../collection_replica.go | 2 +- .../collection_replica_test.go | 2 +- .../{reader => querynode}/collection_test.go | 2 +- .../data_sync_service.go | 2 +- .../data_sync_service_test.go | 4 +- .../flow_graph_delete_node.go | 2 +- .../flow_graph_filter_dm_node.go | 4 +- .../flow_graph_insert_node.go | 6 +- .../flow_graph_key2seg_node.go | 2 +- .../flow_graph_message.go | 2 +- .../flow_graph_msg_stream_input_nodes.go | 8 +-- .../{reader => querynode}/flow_graph_node.go | 2 +- .../flow_graph_schema_update_node.go | 2 +- .../flow_graph_service_time_node.go | 2 +- internal/{reader => querynode}/index.go | 2 +- .../{reader => querynode}/meta_service.go | 7 +- .../meta_service_test.go | 35 +++++----- internal/{reader => querynode}/param_table.go | 70 ++++++++++++------- .../{reader => querynode}/param_table_test.go | 57 +++++++++++---- internal/{reader => querynode}/partition.go | 2 +- .../{reader => querynode}/partition_test.go | 2 +- internal/{reader => querynode}/plan.go | 2 +- internal/{reader => querynode}/query_node.go | 2 +- .../{reader => querynode}/query_node_test.go | 2 +- internal/{reader => querynode}/reader.go | 2 +- .../{reader => querynode}/search_service.go | 4 +- .../search_service_test.go | 2 +- internal/{reader => querynode}/segment.go | 39 +++++++---- .../{reader => querynode}/segment_test.go | 5 +- .../{reader => querynode}/stats_service.go | 4 +- .../stats_service_test.go | 4 +- internal/{reader => querynode}/tsafe.go | 2 +- internal/{reader => querynode}/tsafe_test.go | 2 +- internal/{reader => querynode}/type_def.go | 2 +- scripts/run_go_unittest.sh | 4 +- 54 files changed, 373 insertions(+), 165 deletions(-) rename cmd/{reader/reader.go => querynode/query_node.go} (80%) rename internal/{reader => querynode}/collection.go (98%) rename internal/{reader => querynode}/collection_replica.go (99%) rename internal/{reader => querynode}/collection_replica_test.go (99%) rename internal/{reader => querynode}/collection_test.go (99%) rename internal/{reader => querynode}/data_sync_service.go (99%) rename internal/{reader => querynode}/data_sync_service_test.go (98%) rename internal/{reader => querynode}/flow_graph_delete_node.go (96%) rename internal/{reader => querynode}/flow_graph_filter_dm_node.go (97%) rename internal/{reader => querynode}/flow_graph_insert_node.go (98%) rename internal/{reader => querynode}/flow_graph_key2seg_node.go (98%) rename internal/{reader => querynode}/flow_graph_message.go (98%) rename internal/{reader => querynode}/flow_graph_msg_stream_input_nodes.go (84%) rename internal/{reader => querynode}/flow_graph_node.go (90%) rename internal/{reader => querynode}/flow_graph_schema_update_node.go (96%) rename internal/{reader => querynode}/flow_graph_service_time_node.go (98%) rename internal/{reader => querynode}/index.go (97%) rename internal/{reader => querynode}/meta_service.go (97%) rename internal/{reader => querynode}/meta_service_test.go (96%) rename internal/{reader => querynode}/param_table.go (75%) rename internal/{reader => querynode}/param_table_test.go (58%) rename internal/{reader => querynode}/partition.go (97%) rename internal/{reader => querynode}/partition_test.go (99%) rename internal/{reader => querynode}/plan.go (98%) rename internal/{reader => querynode}/query_node.go (98%) rename internal/{reader => querynode}/query_node_test.go (96%) rename internal/{reader => querynode}/reader.go (89%) rename internal/{reader => querynode}/search_service.go (99%) rename internal/{reader => querynode}/search_service_test.go (99%) rename internal/{reader => querynode}/segment.go (83%) rename internal/{reader => querynode}/segment_test.go (99%) rename internal/{reader => querynode}/stats_service.go (96%) rename internal/{reader => querynode}/stats_service_test.go (98%) rename internal/{reader => querynode}/tsafe.go (98%) rename internal/{reader => querynode}/tsafe_test.go (95%) rename internal/{reader => querynode}/type_def.go (94%) diff --git a/Makefile b/Makefile index 0d5d3dc544..61c2c19aff 100644 --- a/Makefile +++ b/Makefile @@ -56,8 +56,8 @@ verifiers: cppcheck fmt lint ruleguard # Builds various components locally. build-go: @echo "Building each component's binary to './'" - @echo "Building reader ..." - @mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/reader $(PWD)/cmd/reader/reader.go 1>/dev/null + @echo "Building query node ..." + @mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null @echo "Building master ..." @mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null @echo "Building proxy ..." @@ -72,7 +72,7 @@ build-cpp-with-unittest: # Runs the tests. unittest: test-cpp test-go -#TODO: proxy master reader writer's unittest +#TODO: proxy master query node writer's unittest test-go: @echo "Running go unittests..." @(env bash $(PWD)/scripts/run_go_unittest.sh) @@ -83,14 +83,14 @@ test-cpp: build-cpp-with-unittest #TODO: build each component to docker docker: verifiers - @echo "Building reader docker image '$(TAG)'" + @echo "Building query node docker image '$(TAG)'" @echo "Building proxy docker image '$(TAG)'" @echo "Building master docker image '$(TAG)'" # Builds each component and installs it to $GOPATH/bin. install: all @echo "Installing binary to './bin'" - @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/reader $(GOPATH)/bin/reader + @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/querynode $(GOPATH)/bin/querynode @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/master $(GOPATH)/bin/master @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/proxy $(GOPATH)/bin/proxy @mkdir -p $(LIBRARY_PATH) && cp -f $(PWD)/internal/core/output/lib/* $(LIBRARY_PATH) @@ -100,6 +100,6 @@ clean: @echo "Cleaning up all the generated files" @find . -name '*.test' | xargs rm -fv @find . -name '*~' | xargs rm -fv - @rm -rvf reader + @rm -rvf querynode @rm -rvf master @rm -rvf proxy diff --git a/cmd/reader/reader.go b/cmd/querynode/query_node.go similarity index 80% rename from cmd/reader/reader.go rename to cmd/querynode/query_node.go index 386ed51539..fd15c32379 100644 --- a/cmd/reader/reader.go +++ b/cmd/querynode/query_node.go @@ -6,14 +6,14 @@ import ( "os/signal" "syscall" - "github.com/zilliztech/milvus-distributed/internal/reader" + "github.com/zilliztech/milvus-distributed/internal/querynode" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - reader.Init() + querynode.Init() sc := make(chan os.Signal, 1) signal.Notify(sc, @@ -28,7 +28,7 @@ func main() { cancel() }() - reader.StartQueryNode(ctx) + querynode.StartQueryNode(ctx) switch sig { case syscall.SIGTERM: diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index e0837042d6..4c077af886 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -20,4 +20,7 @@ master: minIDAssignCnt: 1024 maxIDAssignCnt: 16384 # old name: segmentExpireDuration: 2000 - IDAssignExpiration: 2000 # ms \ No newline at end of file + IDAssignExpiration: 2000 # ms + + maxPartitionNum: 4096 + defaultPartitionTag: _default \ No newline at end of file diff --git a/configs/advanced/proxy.yaml b/configs/advanced/proxy.yaml index cc98ad4d85..0d3163c0d1 100644 --- a/configs/advanced/proxy.yaml +++ b/configs/advanced/proxy.yaml @@ -28,3 +28,4 @@ proxy: bufSize: 512 maxNameLength: 255 + maxFieldNum: 64 \ No newline at end of file diff --git a/configs/advanced/query_node.yaml b/configs/advanced/query_node.yaml index 22ec3148f3..b44632d33a 100644 --- a/configs/advanced/query_node.yaml +++ b/configs/advanced/query_node.yaml @@ -9,7 +9,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. -reader: +queryNode: stats: publishInterval: 1000 # milliseconds @@ -19,10 +19,6 @@ reader: maxParallelism: 1024 msgStream: - dm: # TODO: rm dm - #streamBufSize: 1024 # msgPack chan buffer size - recvBufSize: 1024 # msgPack chan buffer size - pulsarBufSize: 1024 # pulsar chan buffer size insert: #streamBufSize: 1024 # msgPack chan buffer size recvBufSize: 1024 # msgPack chan buffer size diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index fc3be7f6c8..cdaa967228 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -18,6 +18,7 @@ #include #include #include +#include CSegmentBase NewSegment(CCollection collection, uint64_t segment_id) { @@ -41,7 +42,7 @@ DeleteSegment(CSegmentBase segment) { ////////////////////////////////////////////////////////////////// -int +CStatus Insert(CSegmentBase c_segment, int64_t reserved_offset, int64_t size, @@ -57,11 +58,22 @@ Insert(CSegmentBase c_segment, dataChunk.sizeof_per_row = sizeof_per_row; dataChunk.count = count; - auto res = segment->Insert(reserved_offset, size, row_ids, timestamps, dataChunk); + try { + auto res = segment->Insert(reserved_offset, size, row_ids, timestamps, dataChunk); + + auto status = CStatus(); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (std::runtime_error& e) { + auto status = CStatus(); + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + return status; + } // TODO: delete print // std::cout << "do segment insert, sizeof_per_row = " << sizeof_per_row << std::endl; - return res.code(); } int64_t @@ -73,13 +85,24 @@ PreInsert(CSegmentBase c_segment, int64_t size) { return segment->PreInsert(size); } -int +CStatus Delete( CSegmentBase c_segment, int64_t reserved_offset, int64_t size, const int64_t* row_ids, const uint64_t* timestamps) { auto segment = (milvus::segcore::SegmentBase*)c_segment; - auto res = segment->Delete(reserved_offset, size, row_ids, timestamps); - return res.code(); + try { + auto res = segment->Delete(reserved_offset, size, row_ids, timestamps); + + auto status = CStatus(); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (std::runtime_error& e) { + auto status = CStatus(); + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + return status; + } } int64_t @@ -91,7 +114,7 @@ PreDelete(CSegmentBase c_segment, int64_t size) { return segment->PreDelete(size); } -int +CStatus Search(CSegmentBase c_segment, CPlan c_plan, CPlaceholderGroup* c_placeholder_groups, @@ -107,14 +130,22 @@ Search(CSegmentBase c_segment, } milvus::segcore::QueryResult query_result; - auto res = segment->Search(plan, placeholder_groups.data(), timestamps, num_groups, query_result); + auto status = CStatus(); + try { + auto res = segment->Search(plan, placeholder_groups.data(), timestamps, num_groups, query_result); + status.error_code = Success; + status.error_msg = ""; + } catch (std::runtime_error& e) { + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + } // result_ids and result_distances have been allocated memory in goLang, // so we don't need to malloc here. memcpy(result_ids, query_result.result_ids_.data(), query_result.get_row_count() * sizeof(int64_t)); memcpy(result_distances, query_result.result_distances_.data(), query_result.get_row_count() * sizeof(float)); - return res.code(); + return status; } ////////////////////////////////////////////////////////////////// diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index 5e681bc689..c2af7e8305 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -14,12 +14,24 @@ extern "C" { #endif #include -#include "segcore/collection_c.h" -#include "segcore/plan_c.h" +#include #include +#include "segcore/collection_c.h" +#include "segcore/plan_c.h" + typedef void* CSegmentBase; +enum ErrorCode { + Success = 0, + UnexpectedException = 1, +}; + +typedef struct CStatus { + int error_code; + const char* error_msg; +} CStatus; + CSegmentBase NewSegment(CCollection collection, uint64_t segment_id); @@ -28,7 +40,7 @@ DeleteSegment(CSegmentBase segment); ////////////////////////////////////////////////////////////////// -int +CStatus Insert(CSegmentBase c_segment, int64_t reserved_offset, int64_t size, @@ -41,14 +53,14 @@ Insert(CSegmentBase c_segment, int64_t PreInsert(CSegmentBase c_segment, int64_t size); -int +CStatus Delete( CSegmentBase c_segment, int64_t reserved_offset, int64_t size, const int64_t* row_ids, const uint64_t* timestamps); int64_t PreDelete(CSegmentBase c_segment, int64_t size); -int +CStatus Search(CSegmentBase c_segment, CPlan plan, CPlaceholderGroup* placeholder_groups, diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index d25c4a8a3b..b16d545741 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -65,7 +65,7 @@ TEST(CApiTest, InsertTest) { auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N); - assert(res == 0); + assert(res.error_code == Success); DeleteCollection(collection); DeleteSegment(segment); @@ -82,7 +82,7 @@ TEST(CApiTest, DeleteTest) { auto offset = PreDelete(segment, 3); auto del_res = Delete(segment, offset, 3, delete_row_ids, delete_timestamps); - assert(del_res == 0); + assert(del_res.error_code == Success); DeleteCollection(collection); DeleteSegment(segment); @@ -116,7 +116,7 @@ TEST(CApiTest, SearchTest) { auto offset = PreInsert(segment, N); auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N); - assert(ins_res == 0); + assert(ins_res.error_code == Success); const char* dsl_string = R"( { @@ -163,7 +163,7 @@ TEST(CApiTest, SearchTest) { float result_distances[100]; auto sea_res = Search(segment, plan, placeholderGroups.data(), timestamps.data(), 1, result_ids, result_distances); - assert(sea_res == 0); + assert(sea_res.error_code == Success); DeletePlan(plan); DeletePlaceholderGroup(placeholderGroup); @@ -199,7 +199,7 @@ TEST(CApiTest, BuildIndexTest) { auto offset = PreInsert(segment, N); auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N); - assert(ins_res == 0); + assert(ins_res.error_code == Success); // TODO: add index ptr Close(segment); @@ -250,7 +250,7 @@ TEST(CApiTest, BuildIndexTest) { float result_distances[100]; auto sea_res = Search(segment, plan, placeholderGroups.data(), timestamps.data(), 1, result_ids, result_distances); - assert(sea_res == 0); + assert(sea_res.error_code == Success); DeletePlan(plan); DeletePlaceholderGroup(placeholderGroup); @@ -315,7 +315,7 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) { auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N); - assert(res == 0); + assert(res.error_code == Success); auto memory_usage_size = GetMemoryUsageInBytes(segment); @@ -482,7 +482,7 @@ TEST(CApiTest, GetDeletedCountTest) { auto offset = PreDelete(segment, 3); auto del_res = Delete(segment, offset, 3, delete_row_ids, delete_timestamps); - assert(del_res == 0); + assert(del_res.error_code == Success); // TODO: assert(deleted_count == len(delete_row_ids)) auto deleted_count = GetDeletedCount(segment); @@ -502,7 +502,7 @@ TEST(CApiTest, GetRowCountTest) { auto line_sizeof = (sizeof(int) + sizeof(float) * 16); auto offset = PreInsert(segment, N); auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N); - assert(res == 0); + assert(res.error_code == Success); auto row_count = GetRowCount(segment); assert(row_count == N); diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 0f76b45d40..991828a9cb 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -220,7 +220,7 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionMeta) error { } if len(coll.PartitionTags) == 0 { - coll.PartitionTags = append(coll.PartitionTags, "default") + coll.PartitionTags = append(coll.PartitionTags, Params.DefaultPartitionTag) } _, ok := mt.collName2ID[coll.Schema.Name] if ok { @@ -292,6 +292,10 @@ func (mt *metaTable) AddPartition(collID UniqueID, tag string) error { return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collID, 10)) } + // number of partition tags (except _default) should be limited to 4096 by default + if int64(len(coll.PartitionTags)) > Params.MaxPartitionNum { + return errors.New("maximum partition's number should be limit to " + strconv.FormatInt(Params.MaxPartitionNum, 10)) + } for _, t := range coll.PartitionTags { if t == tag { return errors.Errorf("partition already exists.") @@ -326,17 +330,29 @@ func (mt *metaTable) DeletePartition(collID UniqueID, tag string) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() + if tag == Params.DefaultPartitionTag { + return errors.New("default partition cannot be deleted") + } + collMeta, ok := mt.collID2Meta[collID] if !ok { return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collID, 10)) } + // check tag exists + exist := false + pt := make([]string, 0, len(collMeta.PartitionTags)) for _, t := range collMeta.PartitionTags { if t != tag { pt = append(pt, t) + } else { + exist = true } } + if !exist { + return errors.New("partition " + tag + " does not exist") + } if len(pt) == len(collMeta.PartitionTags) { return nil } diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index 674b016259..13482124d0 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -3,6 +3,7 @@ package master import ( "context" "reflect" + "strconv" "testing" "github.com/stretchr/testify/assert" @@ -238,6 +239,10 @@ func TestMetaTable_DeletePartition(t *testing.T) { assert.Equal(t, 1, len(meta.collName2ID)) assert.Equal(t, 1, len(meta.collID2Meta)) assert.Equal(t, 1, len(meta.segID2Meta)) + + // delete not exist + err = meta.DeletePartition(100, "not_exist") + assert.NotNil(t, err) } func TestMetaTable_Segment(t *testing.T) { @@ -366,3 +371,39 @@ func TestMetaTable_UpdateSegment(t *testing.T) { assert.Nil(t, err) assert.Equal(t, seg.NumRows, int64(210)) } + +func TestMetaTable_AddPartition_Limit(t *testing.T) { + Init() + Params.MaxPartitionNum = 256 // adding 4096 partitions is too slow + etcdAddr := Params.EtcdAddress + + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root") + + _, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix()) + assert.Nil(t, err) + + meta, err := NewMetaTable(etcdKV) + assert.Nil(t, err) + defer meta.client.Close() + + colMeta := pb.CollectionMeta{ + ID: 100, + Schema: &schemapb.CollectionSchema{ + Name: "coll1", + }, + CreateTime: 0, + SegmentIDs: []UniqueID{}, + PartitionTags: []string{}, + } + err = meta.AddCollection(&colMeta) + assert.Nil(t, err) + + for i := 0; i < int(Params.MaxPartitionNum); i++ { + err := meta.AddPartition(100, "partition_"+strconv.Itoa(i)) + assert.Nil(t, err) + } + err = meta.AddPartition(100, "partition_limit") + assert.NotNil(t, err) +} diff --git a/internal/master/param_table.go b/internal/master/param_table.go index 872bdab34e..2d32f2e769 100644 --- a/internal/master/param_table.go +++ b/internal/master/param_table.go @@ -43,6 +43,9 @@ type ParamTable struct { K2SChannelNames []string QueryNodeStatsChannelName string MsgChannelSubName string + + MaxPartitionNum int64 + DefaultPartitionTag string } var Params ParamTable @@ -91,6 +94,8 @@ func (p *ParamTable) Init() { p.initK2SChannelNames() p.initQueryNodeStatsChannelName() p.initMsgChannelSubName() + p.initMaxPartitionNum() + p.initDefaultPartitionTag() } func (p *ParamTable) initAddress() { @@ -411,3 +416,24 @@ func (p *ParamTable) initK2SChannelNames() { } p.K2SChannelNames = channels } + +func (p *ParamTable) initMaxPartitionNum() { + str, err := p.Load("master.maxPartitionNum") + if err != nil { + panic(err) + } + maxPartitionNum, err := strconv.ParseInt(str, 10, 64) + if err != nil { + panic(err) + } + p.MaxPartitionNum = maxPartitionNum +} + +func (p *ParamTable) initDefaultPartitionTag() { + defaultTag, err := p.Load("master.defaultPartitionTag") + if err != nil { + panic(err) + } + + p.DefaultPartitionTag = defaultTag +} diff --git a/internal/master/partition_task.go b/internal/master/partition_task.go index 28636c930c..80a339b2fc 100644 --- a/internal/master/partition_task.go +++ b/internal/master/partition_task.go @@ -191,10 +191,12 @@ func (t *showPartitionTask) Execute() error { return errors.New("null request") } - partitions := make([]string, 0) - for _, collection := range t.mt.collID2Meta { - partitions = append(partitions, collection.PartitionTags...) + collMeta, err := t.mt.GetCollectionByName(t.req.CollectionName.CollectionName) + if err != nil { + return err } + partitions := make([]string, 0) + partitions = append(partitions, collMeta.PartitionTags...) stringListResponse := servicepb.StringListResponse{ Status: &commonpb.Status{ diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index cd784f9780..53b698e64a 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -60,6 +60,9 @@ func TestMaster_Partition(t *testing.T) { K2SChannelNames: []string{"k2s0", "k2s1"}, QueryNodeStatsChannelName: "statistic", MsgChannelSubName: Params.MsgChannelSubName, + + MaxPartitionNum: int64(4096), + DefaultPartitionTag: "_default", } port := 10000 + rand.Intn(1000) @@ -212,7 +215,7 @@ func TestMaster_Partition(t *testing.T) { //assert.Equal(t, collMeta.PartitionTags[0], "partition1") //assert.Equal(t, collMeta.PartitionTags[1], "partition2") - assert.ElementsMatch(t, []string{"default", "partition1", "partition2"}, collMeta.PartitionTags) + assert.ElementsMatch(t, []string{"_default", "partition1", "partition2"}, collMeta.PartitionTags) showPartitionReq := internalpb.ShowPartitionRequest{ MsgType: internalpb.MsgType_kShowPartitions, @@ -224,7 +227,7 @@ func TestMaster_Partition(t *testing.T) { stringList, err := cli.ShowPartitions(ctx, &showPartitionReq) assert.Nil(t, err) - assert.ElementsMatch(t, []string{"default", "partition1", "partition2"}, stringList.Values) + assert.ElementsMatch(t, []string{"_default", "partition1", "partition2"}, stringList.Values) showPartitionReq = internalpb.ShowPartitionRequest{ MsgType: internalpb.MsgType_kShowPartitions, diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index 7063647be7..85942bdd62 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -261,6 +261,9 @@ func startupMaster() { K2SChannelNames: []string{"k2s0", "k2s1"}, QueryNodeStatsChannelName: "statistic", MsgChannelSubName: Params.MsgChannelSubName, + + MaxPartitionNum: int64(4096), + DefaultPartitionTag: "_default", } master, err = CreateServer(ctx) diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index 7f5153005e..59ac280c9c 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -70,7 +70,7 @@ func (ms *PulsarMsgStream) CreatePulsarProducers(channels []string) { for i := 0; i < len(channels); i++ { pp, err := (*ms.client).CreateProducer(pulsar.ProducerOptions{Topic: channels[i]}) if err != nil { - log.Printf("Failed to create reader producer %s, error = %v", channels[i], err) + log.Printf("Failed to create querynode producer %s, error = %v", channels[i], err) } ms.producers = append(ms.producers, &pp) } diff --git a/internal/proxy/paramtable.go b/internal/proxy/paramtable.go index c53c845096..791927777d 100644 --- a/internal/proxy/paramtable.go +++ b/internal/proxy/paramtable.go @@ -463,3 +463,15 @@ func (pt *ParamTable) MaxNameLength() int64 { } return maxNameLength } + +func (pt *ParamTable) MaxFieldNum() int64 { + str, err := pt.Load("proxy.maxFieldNum") + if err != nil { + panic(err) + } + maxFieldNum, err := strconv.ParseInt(str, 10, 64) + if err != nil { + panic(err) + } + return maxFieldNum +} diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 65e4e214ee..1d23ddaafd 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log" + "strconv" "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/allocator" @@ -164,6 +165,10 @@ func (cct *CreateCollectionTask) SetTs(ts Timestamp) { } func (cct *CreateCollectionTask) PreExecute() error { + if int64(len(cct.schema.Fields)) > Params.MaxFieldNum() { + return errors.New("maximum field's number should be limited to " + strconv.FormatInt(Params.MaxFieldNum(), 10)) + } + // validate collection name if err := ValidateCollectionName(cct.schema.Name); err != nil { return err diff --git a/internal/proxy/validate_util.go b/internal/proxy/validate_util.go index 8049595f28..4f892603f9 100644 --- a/internal/proxy/validate_util.go +++ b/internal/proxy/validate_util.go @@ -68,7 +68,7 @@ func ValidatePartitionTag(partitionTag string, strictCheck bool) error { if strictCheck { firstChar := partitionTag[0] - if firstChar != '_' && !isAlpha(firstChar) { + if firstChar != '_' && !isAlpha(firstChar) && !isNumber(firstChar) { msg := invalidMsg + "The first character of a partition tag must be an underscore or letter." return errors.New(msg) } diff --git a/internal/reader/collection.go b/internal/querynode/collection.go similarity index 98% rename from internal/reader/collection.go rename to internal/querynode/collection.go index 9d7e3fb511..5ca3b7ffbc 100644 --- a/internal/reader/collection.go +++ b/internal/querynode/collection.go @@ -1,4 +1,4 @@ -package reader +package querynode /* diff --git a/internal/reader/collection_replica.go b/internal/querynode/collection_replica.go similarity index 99% rename from internal/reader/collection_replica.go rename to internal/querynode/collection_replica.go index af8b4d3d27..68c2502048 100644 --- a/internal/reader/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -1,4 +1,4 @@ -package reader +package querynode /* diff --git a/internal/reader/collection_replica_test.go b/internal/querynode/collection_replica_test.go similarity index 99% rename from internal/reader/collection_replica_test.go rename to internal/querynode/collection_replica_test.go index 1bbe00d424..b133aa642a 100644 --- a/internal/reader/collection_replica_test.go +++ b/internal/querynode/collection_replica_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" diff --git a/internal/reader/collection_test.go b/internal/querynode/collection_test.go similarity index 99% rename from internal/reader/collection_test.go rename to internal/querynode/collection_test.go index 58d8747a2e..5fa8cee1fa 100644 --- a/internal/reader/collection_test.go +++ b/internal/querynode/collection_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" diff --git a/internal/reader/data_sync_service.go b/internal/querynode/data_sync_service.go similarity index 99% rename from internal/reader/data_sync_service.go rename to internal/querynode/data_sync_service.go index ef746e6521..e865d24b04 100644 --- a/internal/reader/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" diff --git a/internal/reader/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go similarity index 98% rename from internal/reader/data_sync_service_test.go rename to internal/querynode/data_sync_service_test.go index 0f3a0e398d..6661a1d1ac 100644 --- a/internal/reader/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" @@ -175,7 +175,7 @@ func TestDataSyncService_Start(t *testing.T) { // pulsar produce const receiveBufSize = 1024 - producerChannels := []string{"insert"} + producerChannels := Params.insertChannelNames() insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) insertStream.SetPulsarClient(pulsarURL) diff --git a/internal/reader/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go similarity index 96% rename from internal/reader/flow_graph_delete_node.go rename to internal/querynode/flow_graph_delete_node.go index 0a45357d6e..7a7ac954f2 100644 --- a/internal/reader/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -1,4 +1,4 @@ -package reader +package querynode type deleteNode struct { BaseNode diff --git a/internal/reader/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go similarity index 97% rename from internal/reader/flow_graph_filter_dm_node.go rename to internal/querynode/flow_graph_filter_dm_node.go index ddff7e5868..c791ff23aa 100644 --- a/internal/reader/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "log" @@ -29,8 +29,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - // TODO: add time range check - var iMsg = insertMsg{ insertMessages: make([]*msgstream.InsertMsg, 0), timeRange: TimeRange{ diff --git a/internal/reader/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go similarity index 98% rename from internal/reader/flow_graph_insert_node.go rename to internal/querynode/flow_graph_insert_node.go index 2d56e0bd57..8508cccfb1 100644 --- a/internal/reader/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "fmt" @@ -106,6 +106,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn if err != nil { log.Println("cannot find segment:", segmentID) // TODO: add error handling + wg.Done() return } @@ -116,8 +117,9 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn err = targetSegment.segmentInsert(offsets, &ids, ×tamps, &records) if err != nil { - log.Println("insert failed") + log.Println(err) // TODO: add error handling + wg.Done() return } diff --git a/internal/reader/flow_graph_key2seg_node.go b/internal/querynode/flow_graph_key2seg_node.go similarity index 98% rename from internal/reader/flow_graph_key2seg_node.go rename to internal/querynode/flow_graph_key2seg_node.go index 112ff9876b..0a2ffe27d8 100644 --- a/internal/reader/flow_graph_key2seg_node.go +++ b/internal/querynode/flow_graph_key2seg_node.go @@ -1,4 +1,4 @@ -package reader +package querynode type key2SegNode struct { BaseNode diff --git a/internal/reader/flow_graph_message.go b/internal/querynode/flow_graph_message.go similarity index 98% rename from internal/reader/flow_graph_message.go rename to internal/querynode/flow_graph_message.go index 99cb9c4e3d..ab3e27a524 100644 --- a/internal/reader/flow_graph_message.go +++ b/internal/querynode/flow_graph_message.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "github.com/zilliztech/milvus-distributed/internal/msgstream" diff --git a/internal/reader/flow_graph_msg_stream_input_nodes.go b/internal/querynode/flow_graph_msg_stream_input_nodes.go similarity index 84% rename from internal/reader/flow_graph_msg_stream_input_nodes.go rename to internal/querynode/flow_graph_msg_stream_input_nodes.go index a1c08e951a..0ff5a5933b 100644 --- a/internal/reader/flow_graph_msg_stream_input_nodes.go +++ b/internal/querynode/flow_graph_msg_stream_input_nodes.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" @@ -9,8 +9,8 @@ import ( ) func newDmInputNode(ctx context.Context) *flowgraph.InputNode { - receiveBufSize := Params.dmReceiveBufSize() - pulsarBufSize := Params.dmPulsarBufSize() + receiveBufSize := Params.insertReceiveBufSize() + pulsarBufSize := Params.insertPulsarBufSize() msgStreamURL, err := Params.pulsarAddress() if err != nil { @@ -18,7 +18,7 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode { } consumeChannels := Params.insertChannelNames() - consumeSubName := "insertSub" + consumeSubName := Params.msgChannelSubName() insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize) insertStream.SetPulsarClient(msgStreamURL) diff --git a/internal/reader/flow_graph_node.go b/internal/querynode/flow_graph_node.go similarity index 90% rename from internal/reader/flow_graph_node.go rename to internal/querynode/flow_graph_node.go index c87f7a2151..e585fcb07d 100644 --- a/internal/reader/flow_graph_node.go +++ b/internal/querynode/flow_graph_node.go @@ -1,4 +1,4 @@ -package reader +package querynode import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" diff --git a/internal/reader/flow_graph_schema_update_node.go b/internal/querynode/flow_graph_schema_update_node.go similarity index 96% rename from internal/reader/flow_graph_schema_update_node.go rename to internal/querynode/flow_graph_schema_update_node.go index 160f31226e..86f02c7e78 100644 --- a/internal/reader/flow_graph_schema_update_node.go +++ b/internal/querynode/flow_graph_schema_update_node.go @@ -1,4 +1,4 @@ -package reader +package querynode type schemaUpdateNode struct { BaseNode diff --git a/internal/reader/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go similarity index 98% rename from internal/reader/flow_graph_service_time_node.go rename to internal/querynode/flow_graph_service_time_node.go index b314097b19..fe0d26b151 100644 --- a/internal/reader/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "log" diff --git a/internal/reader/index.go b/internal/querynode/index.go similarity index 97% rename from internal/reader/index.go rename to internal/querynode/index.go index 9233892efb..8c8f84b17e 100644 --- a/internal/reader/index.go +++ b/internal/querynode/index.go @@ -1,4 +1,4 @@ -package reader +package querynode /* diff --git a/internal/reader/meta_service.go b/internal/querynode/meta_service.go similarity index 97% rename from internal/reader/meta_service.go rename to internal/querynode/meta_service.go index 620cdd181f..f95ec6776d 100644 --- a/internal/reader/meta_service.go +++ b/internal/querynode/meta_service.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" @@ -111,8 +111,8 @@ func isSegmentChannelRangeInQueryNodeChannelRange(segment *etcdpb.SegmentMeta) b } Params.Init() - var queryNodeChannelStart = Params.topicStart() - var queryNodeChannelEnd = Params.topicEnd() + var queryNodeChannelStart = Params.insertChannelRange()[0] + var queryNodeChannelEnd = Params.insertChannelRange()[1] if segment.ChannelStart >= int32(queryNodeChannelStart) && segment.ChannelEnd <= int32(queryNodeChannelEnd) { return true @@ -167,6 +167,7 @@ func (mService *metaService) processSegmentCreate(id string, value string) { seg := mService.segmentUnmarshal(value) if !isSegmentChannelRangeInQueryNodeChannelRange(seg) { + log.Println("Illegal segment channel range") return } diff --git a/internal/reader/meta_service_test.go b/internal/querynode/meta_service_test.go similarity index 96% rename from internal/reader/meta_service_test.go rename to internal/querynode/meta_service_test.go index df80c4a9d5..66c0c8e535 100644 --- a/internal/reader/meta_service_test.go +++ b/internal/querynode/meta_service_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" @@ -93,7 +93,7 @@ func TestMetaService_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T) CollectionID: UniqueID(0), PartitionTag: "partition0", ChannelStart: 0, - ChannelEnd: 128, + ChannelEnd: 1, OpenTime: Timestamp(0), CloseTime: Timestamp(math.MaxUint64), NumRows: UniqueID(0), @@ -264,10 +264,9 @@ func TestMetaService_processSegmentCreate(t *testing.T) { PartitionTags: []string{"default"}, } - colMetaBlob, err := proto.Marshal(&collectionMeta) - assert.NoError(t, err) + colMetaBlob := proto.MarshalTextString(&collectionMeta) - err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) + err := (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) assert.NoError(t, err) err = (*node.replica).addPartition(UniqueID(0), "default") @@ -276,7 +275,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) { id := "0" value := `partition_tag: "default" channel_start: 0 - channel_end: 128 + channel_end: 1 close_time: 18446744073709551615 ` @@ -331,7 +330,7 @@ func TestMetaService_processCreate(t *testing.T) { key2 := "by-dev/segment/0" msg2 := `partition_tag: "default" channel_start: 0 - channel_end: 128 + channel_end: 1 close_time: 18446744073709551615 ` @@ -388,10 +387,9 @@ func TestMetaService_processSegmentModify(t *testing.T) { PartitionTags: []string{"default"}, } - colMetaBlob, err := proto.Marshal(&collectionMeta) - assert.NoError(t, err) + colMetaBlob := proto.MarshalTextString(&collectionMeta) - err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) + err := (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) assert.NoError(t, err) err = (*node.replica).addPartition(UniqueID(0), "default") @@ -400,7 +398,7 @@ func TestMetaService_processSegmentModify(t *testing.T) { id := "0" value := `partition_tag: "default" channel_start: 0 - channel_end: 128 + channel_end: 1 close_time: 18446744073709551615 ` @@ -411,7 +409,7 @@ func TestMetaService_processSegmentModify(t *testing.T) { newValue := `partition_tag: "default" channel_start: 0 - channel_end: 128 + channel_end: 1 close_time: 18446744073709551615 ` @@ -581,7 +579,7 @@ func TestMetaService_processModify(t *testing.T) { key2 := "by-dev/segment/0" msg2 := `partition_tag: "p1" channel_start: 0 - channel_end: 128 + channel_end: 1 close_time: 18446744073709551615 ` @@ -637,7 +635,7 @@ func TestMetaService_processModify(t *testing.T) { msg4 := `partition_tag: "p1" channel_start: 0 - channel_end: 128 + channel_end: 1 close_time: 18446744073709551615 ` @@ -694,10 +692,9 @@ func TestMetaService_processSegmentDelete(t *testing.T) { PartitionTags: []string{"default"}, } - colMetaBlob, err := proto.Marshal(&collectionMeta) - assert.NoError(t, err) + colMetaBlob := proto.MarshalTextString(&collectionMeta) - err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) + err := (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) assert.NoError(t, err) err = (*node.replica).addPartition(UniqueID(0), "default") @@ -706,7 +703,7 @@ func TestMetaService_processSegmentDelete(t *testing.T) { id := "0" value := `partition_tag: "default" channel_start: 0 - channel_end: 128 + channel_end: 1 close_time: 18446744073709551615 ` @@ -810,7 +807,7 @@ func TestMetaService_processDelete(t *testing.T) { key2 := "by-dev/segment/0" msg2 := `partition_tag: "default" channel_start: 0 - channel_end: 128 + channel_end: 1 close_time: 18446744073709551615 ` diff --git a/internal/reader/param_table.go b/internal/querynode/param_table.go similarity index 75% rename from internal/reader/param_table.go rename to internal/querynode/param_table.go index abae38270b..ff2fb47ae2 100644 --- a/internal/reader/param_table.go +++ b/internal/querynode/param_table.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "log" @@ -46,35 +46,37 @@ func (p *ParamTable) queryNodeID() int { return id } -// TODO: func (p *ParamTable) DmChannelRange() []int { -func (p *ParamTable) topicStart() int { - topicStart, err := p.Load("reader.topicstart") +func (p *ParamTable) insertChannelRange() []int { + insertChannelRange, err := p.Load("msgChannel.channelRange.insert") if err != nil { panic(err) } - topicStartNum, err := strconv.Atoi(topicStart) - if err != nil { - panic(err) - } - return topicStartNum -} -func (p *ParamTable) topicEnd() int { - topicEnd, err := p.Load("reader.topicend") + channelRange := strings.Split(insertChannelRange, ",") + if len(channelRange) != 2 { + panic("Illegal channel range num") + } + channelBegin, err := strconv.Atoi(channelRange[0]) if err != nil { panic(err) } - topicEndNum, err := strconv.Atoi(topicEnd) + channelEnd, err := strconv.Atoi(channelRange[1]) if err != nil { panic(err) } - return topicEndNum + if channelBegin < 0 || channelEnd < 0 { + panic("Illegal channel range value") + } + if channelBegin > channelEnd { + panic("Illegal channel range value") + } + return []int{channelBegin, channelEnd} } // advanced params // stats func (p *ParamTable) statsPublishInterval() int { - timeInterval, err := p.Load("reader.stats.publishInterval") + timeInterval, err := p.Load("queryNode.stats.publishInterval") if err != nil { panic(err) } @@ -87,7 +89,7 @@ func (p *ParamTable) statsPublishInterval() int { // dataSync: func (p *ParamTable) flowGraphMaxQueueLength() int32 { - queueLength, err := p.Load("reader.dataSync.flowGraph.maxQueueLength") + queueLength, err := p.Load("queryNode.dataSync.flowGraph.maxQueueLength") if err != nil { panic(err) } @@ -99,7 +101,7 @@ func (p *ParamTable) flowGraphMaxQueueLength() int32 { } func (p *ParamTable) flowGraphMaxParallelism() int32 { - maxParallelism, err := p.Load("reader.dataSync.flowGraph.maxParallelism") + maxParallelism, err := p.Load("queryNode.dataSync.flowGraph.maxParallelism") if err != nil { panic(err) } @@ -111,9 +113,8 @@ func (p *ParamTable) flowGraphMaxParallelism() int32 { } // msgStream -// TODO: func (p *ParamTable) insertStreamBufSize() int64 -func (p *ParamTable) dmReceiveBufSize() int64 { - revBufSize, err := p.Load("reader.msgStream.dm.recvBufSize") +func (p *ParamTable) insertReceiveBufSize() int64 { + revBufSize, err := p.Load("queryNode.msgStream.insert.recvBufSize") if err != nil { panic(err) } @@ -124,8 +125,8 @@ func (p *ParamTable) dmReceiveBufSize() int64 { return int64(bufSize) } -func (p *ParamTable) dmPulsarBufSize() int64 { - pulsarBufSize, err := p.Load("reader.msgStream.dm.pulsarBufSize") +func (p *ParamTable) insertPulsarBufSize() int64 { + pulsarBufSize, err := p.Load("queryNode.msgStream.insert.pulsarBufSize") if err != nil { panic(err) } @@ -137,7 +138,7 @@ func (p *ParamTable) dmPulsarBufSize() int64 { } func (p *ParamTable) searchReceiveBufSize() int64 { - revBufSize, err := p.Load("reader.msgStream.search.recvBufSize") + revBufSize, err := p.Load("queryNode.msgStream.search.recvBufSize") if err != nil { panic(err) } @@ -149,7 +150,7 @@ func (p *ParamTable) searchReceiveBufSize() int64 { } func (p *ParamTable) searchPulsarBufSize() int64 { - pulsarBufSize, err := p.Load("reader.msgStream.search.pulsarBufSize") + pulsarBufSize, err := p.Load("queryNode.msgStream.search.pulsarBufSize") if err != nil { panic(err) } @@ -161,7 +162,7 @@ func (p *ParamTable) searchPulsarBufSize() int64 { } func (p *ParamTable) searchResultReceiveBufSize() int64 { - revBufSize, err := p.Load("reader.msgStream.searchResult.recvBufSize") + revBufSize, err := p.Load("queryNode.msgStream.searchResult.recvBufSize") if err != nil { panic(err) } @@ -173,7 +174,7 @@ func (p *ParamTable) searchResultReceiveBufSize() int64 { } func (p *ParamTable) statsReceiveBufSize() int64 { - revBufSize, err := p.Load("reader.msgStream.stats.recvBufSize") + revBufSize, err := p.Load("queryNode.msgStream.stats.recvBufSize") if err != nil { panic(err) } @@ -307,3 +308,20 @@ func (p *ParamTable) searchResultChannelNames() []string { } return channels } + +func (p *ParamTable) msgChannelSubName() string { + // TODO: subName = namePrefix + "-" + queryNodeID, queryNodeID is assigned by master + name, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") + if err != nil { + log.Panic(err) + } + return name +} + +func (p *ParamTable) statsChannelName() string { + channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats") + if err != nil { + panic(err) + } + return channels +} diff --git a/internal/reader/param_table_test.go b/internal/querynode/param_table_test.go similarity index 58% rename from internal/reader/param_table_test.go rename to internal/querynode/param_table_test.go index 0e93b521ec..e9405f8a84 100644 --- a/internal/reader/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "strings" @@ -26,16 +26,12 @@ func TestParamTable_QueryNodeID(t *testing.T) { assert.Equal(t, id, 0) } -func TestParamTable_TopicStart(t *testing.T) { +func TestParamTable_insertChannelRange(t *testing.T) { Params.Init() - topicStart := Params.topicStart() - assert.Equal(t, topicStart, 0) -} - -func TestParamTable_TopicEnd(t *testing.T) { - Params.Init() - topicEnd := Params.topicEnd() - assert.Equal(t, topicEnd, 128) + channelRange := Params.insertChannelRange() + assert.Equal(t, len(channelRange), 2) + assert.Equal(t, channelRange[0], 0) + assert.Equal(t, channelRange[1], 1) } func TestParamTable_statsServiceTimeInterval(t *testing.T) { @@ -50,9 +46,9 @@ func TestParamTable_statsMsgStreamReceiveBufSize(t *testing.T) { assert.Equal(t, bufSize, int64(64)) } -func TestParamTable_dmMsgStreamReceiveBufSize(t *testing.T) { +func TestParamTable_insertMsgStreamReceiveBufSize(t *testing.T) { Params.Init() - bufSize := Params.dmReceiveBufSize() + bufSize := Params.insertReceiveBufSize() assert.Equal(t, bufSize, int64(1024)) } @@ -74,9 +70,9 @@ func TestParamTable_searchPulsarBufSize(t *testing.T) { assert.Equal(t, bufSize, int64(512)) } -func TestParamTable_dmPulsarBufSize(t *testing.T) { +func TestParamTable_insertPulsarBufSize(t *testing.T) { Params.Init() - bufSize := Params.dmPulsarBufSize() + bufSize := Params.insertPulsarBufSize() assert.Equal(t, bufSize, int64(1024)) } @@ -91,3 +87,36 @@ func TestParamTable_flowGraphMaxParallelism(t *testing.T) { maxParallelism := Params.flowGraphMaxParallelism() assert.Equal(t, maxParallelism, int32(1024)) } + +func TestParamTable_insertChannelNames(t *testing.T) { + Params.Init() + names := Params.insertChannelNames() + assert.Equal(t, len(names), 1) + assert.Equal(t, names[0], "insert-0") +} + +func TestParamTable_searchChannelNames(t *testing.T) { + Params.Init() + names := Params.searchChannelNames() + assert.Equal(t, len(names), 1) + assert.Equal(t, names[0], "search-0") +} + +func TestParamTable_searchResultChannelNames(t *testing.T) { + Params.Init() + names := Params.searchResultChannelNames() + assert.Equal(t, len(names), 1) + assert.Equal(t, names[0], "searchResult-0") +} + +func TestParamTable_msgChannelSubName(t *testing.T) { + Params.Init() + name := Params.msgChannelSubName() + assert.Equal(t, name, "queryNode") +} + +func TestParamTable_statsChannelName(t *testing.T) { + Params.Init() + name := Params.statsChannelName() + assert.Equal(t, name, "query-node-stats") +} diff --git a/internal/reader/partition.go b/internal/querynode/partition.go similarity index 97% rename from internal/reader/partition.go rename to internal/querynode/partition.go index 210f1bf02f..e2dc4593d7 100644 --- a/internal/reader/partition.go +++ b/internal/querynode/partition.go @@ -1,4 +1,4 @@ -package reader +package querynode /* diff --git a/internal/reader/partition_test.go b/internal/querynode/partition_test.go similarity index 99% rename from internal/reader/partition_test.go rename to internal/querynode/partition_test.go index 9c19eb0852..fe7484a8f8 100644 --- a/internal/reader/partition_test.go +++ b/internal/querynode/partition_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" diff --git a/internal/reader/plan.go b/internal/querynode/plan.go similarity index 98% rename from internal/reader/plan.go rename to internal/querynode/plan.go index 23f22bb303..f2cae5133c 100644 --- a/internal/reader/plan.go +++ b/internal/querynode/plan.go @@ -1,4 +1,4 @@ -package reader +package querynode /* #cgo CFLAGS: -I${SRCDIR}/../core/output/include diff --git a/internal/reader/query_node.go b/internal/querynode/query_node.go similarity index 98% rename from internal/reader/query_node.go rename to internal/querynode/query_node.go index f4885fc07b..f25c2b12d5 100644 --- a/internal/reader/query_node.go +++ b/internal/querynode/query_node.go @@ -1,4 +1,4 @@ -package reader +package querynode /* diff --git a/internal/reader/query_node_test.go b/internal/querynode/query_node_test.go similarity index 96% rename from internal/reader/query_node_test.go rename to internal/querynode/query_node_test.go index af7153cda5..9cf6929fcd 100644 --- a/internal/reader/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" diff --git a/internal/reader/reader.go b/internal/querynode/reader.go similarity index 89% rename from internal/reader/reader.go rename to internal/querynode/reader.go index 91852eac13..feb5f73fd9 100644 --- a/internal/reader/reader.go +++ b/internal/querynode/reader.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" diff --git a/internal/reader/search_service.go b/internal/querynode/search_service.go similarity index 99% rename from internal/reader/search_service.go rename to internal/querynode/search_service.go index b34fe3fd72..718bb4ccb6 100644 --- a/internal/reader/search_service.go +++ b/internal/querynode/search_service.go @@ -1,4 +1,4 @@ -package reader +package querynode import "C" import ( @@ -43,7 +43,7 @@ func newSearchService(ctx context.Context, replica *collectionReplica) *searchSe } consumeChannels := Params.searchChannelNames() - consumeSubName := "subSearch" + consumeSubName := Params.msgChannelSubName() searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) searchStream.SetPulsarClient(msgStreamURL) unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() diff --git a/internal/reader/search_service_test.go b/internal/querynode/search_service_test.go similarity index 99% rename from internal/reader/search_service_test.go rename to internal/querynode/search_service_test.go index 5d2fd7d911..31831d0939 100644 --- a/internal/reader/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" diff --git a/internal/reader/segment.go b/internal/querynode/segment.go similarity index 83% rename from internal/reader/segment.go rename to internal/querynode/segment.go index 1c7ab40ee2..6754960807 100644 --- a/internal/reader/segment.go +++ b/internal/querynode/segment.go @@ -1,4 +1,4 @@ -package reader +package querynode /* @@ -109,7 +109,7 @@ func (s *Segment) segmentPreDelete(numOfRecords int) int64 { //-------------------------------------------------------------------------------------- dm & search functions func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp, records *[]*commonpb.Blob) error { /* - int + CStatus Insert(CSegmentBase c_segment, long int reserved_offset, signed long int size, @@ -148,8 +148,12 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps cSizeofPerRow, cNumOfRows) - if status != 0 { - return errors.New("Insert failed, error code = " + strconv.Itoa(int(status))) + errorCode := status.error_code + + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) } s.recentlyModified = true @@ -158,7 +162,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp) error { /* - int + CStatus Delete(CSegmentBase c_segment, long int reserved_offset, long size, @@ -172,8 +176,12 @@ func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps var status = C.Delete(s.segmentPtr, cOffset, cSize, cEntityIdsPtr, cTimestampsPtr) - if status != 0 { - return errors.New("Delete failed, error code = " + strconv.Itoa(int(status))) + errorCode := status.error_code + + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return errors.New("Delete failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) } return nil @@ -187,7 +195,8 @@ func (s *Segment) segmentSearch(plan *Plan, numQueries int64, topK int64) error { /* - void* Search(void* plan, + CStatus + Search(void* plan, void* placeholder_groups, uint64_t* timestamps, int num_groups, @@ -211,16 +220,20 @@ func (s *Segment) segmentSearch(plan *Plan, var cNumGroups = C.int(len(placeHolderGroups)) var status = C.Search(s.segmentPtr, plan.cPlan, cPlaceHolder, cTimestamp, cNumGroups, cNewResultIds, cNewResultDistances) - if status != 0 { - return errors.New("search failed, error code = " + strconv.Itoa(int(status))) + errorCode := status.error_code + + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return errors.New("Search failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) } cNumQueries := C.long(numQueries) cTopK := C.long(topK) // reduce search result - status = C.MergeInto(cNumQueries, cTopK, cResultDistances, cResultIds, cNewResultDistances, cNewResultIds) - if status != 0 { - return errors.New("merge search result failed, error code = " + strconv.Itoa(int(status))) + mergeStatus := C.MergeInto(cNumQueries, cTopK, cResultDistances, cResultIds, cNewResultDistances, cNewResultIds) + if mergeStatus != 0 { + return errors.New("merge search result failed, error code = " + strconv.Itoa(int(mergeStatus))) } return nil } diff --git a/internal/reader/segment_test.go b/internal/querynode/segment_test.go similarity index 99% rename from internal/reader/segment_test.go rename to internal/querynode/segment_test.go index c14302ff1f..b6b6325eef 100644 --- a/internal/reader/segment_test.go +++ b/internal/querynode/segment_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" @@ -463,7 +463,6 @@ func TestSegment_segmentInsert(t *testing.T) { err := segment.segmentInsert(offset, &ids, ×tamps, &records) assert.NoError(t, err) - deleteSegment(segment) deleteCollection(collection) } @@ -640,7 +639,7 @@ func TestSegment_segmentSearch(t *testing.T) { pulsarURL, _ := Params.pulsarAddress() const receiveBufSize = 1024 - searchProducerChannels := []string{"search"} + searchProducerChannels := Params.searchChannelNames() searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) searchStream.SetPulsarClient(pulsarURL) searchStream.CreatePulsarProducers(searchProducerChannels) diff --git a/internal/reader/stats_service.go b/internal/querynode/stats_service.go similarity index 96% rename from internal/reader/stats_service.go rename to internal/querynode/stats_service.go index ddc241bd01..81ebfe15e4 100644 --- a/internal/reader/stats_service.go +++ b/internal/querynode/stats_service.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" @@ -36,7 +36,7 @@ func (sService *statsService) start() { if err != nil { log.Fatal(err) } - producerChannels := []string{"statistic"} + producerChannels := []string{Params.statsChannelName()} statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize) statsStream.SetPulsarClient(msgStreamURL) diff --git a/internal/reader/stats_service_test.go b/internal/querynode/stats_service_test.go similarity index 98% rename from internal/reader/stats_service_test.go rename to internal/querynode/stats_service_test.go index 1fecf8c81a..523e589148 100644 --- a/internal/reader/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "context" @@ -171,7 +171,7 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) { const receiveBufSize = 1024 // start pulsar - producerChannels := []string{"statistic"} + producerChannels := []string{Params.statsChannelName()} statsStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) statsStream.SetPulsarClient(pulsarURL) diff --git a/internal/reader/tsafe.go b/internal/querynode/tsafe.go similarity index 98% rename from internal/reader/tsafe.go rename to internal/querynode/tsafe.go index e8a19c4909..056c47b811 100644 --- a/internal/reader/tsafe.go +++ b/internal/querynode/tsafe.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "sync" diff --git a/internal/reader/tsafe_test.go b/internal/querynode/tsafe_test.go similarity index 95% rename from internal/reader/tsafe_test.go rename to internal/querynode/tsafe_test.go index cf26bee420..e02b915072 100644 --- a/internal/reader/tsafe_test.go +++ b/internal/querynode/tsafe_test.go @@ -1,4 +1,4 @@ -package reader +package querynode import ( "testing" diff --git a/internal/reader/type_def.go b/internal/querynode/type_def.go similarity index 94% rename from internal/reader/type_def.go rename to internal/querynode/type_def.go index f1d7fbab31..6cbd347791 100644 --- a/internal/reader/type_def.go +++ b/internal/querynode/type_def.go @@ -1,4 +1,4 @@ -package reader +package querynode import "github.com/zilliztech/milvus-distributed/internal/util/typeutil" diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index b4bfd4c339..d10c76aa2f 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -13,5 +13,5 @@ SCRIPTS_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" # ignore Minio,S3 unittes MILVUS_DIR="${SCRIPTS_DIR}/../internal/" echo $MILVUS_DIR -#go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/reader/..." "${MILVUS_DIR}/proxy/..." -failfast -go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/reader/..." -failfast +#go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/proxy/..." -failfast +go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast