From eb737d4d59dca0e540da933dc35fe297c37944c4 Mon Sep 17 00:00:00 2001 From: ThreadDao Date: Tue, 31 May 2022 08:56:02 +0800 Subject: [PATCH] [test] [skip-e2e] Add L3 case to test sq load balance cross replicas (#17279) Signed-off-by: ThreadDao --- .../testcases/test_collection.py | 58 ++++++++++++++----- tests/python_client/utils/util_k8s.py | 29 ++++++++++ 2 files changed, 72 insertions(+), 15 deletions(-) diff --git a/tests/python_client/testcases/test_collection.py b/tests/python_client/testcases/test_collection.py index 05910f30de..f19141eea5 100644 --- a/tests/python_client/testcases/test_collection.py +++ b/tests/python_client/testcases/test_collection.py @@ -1072,7 +1072,6 @@ class TestCollectionOperation(TestcaseBase): self.collection_wrap.init_collection(c_name, schema=schema, check_task=CheckTasks.check_collection_property, check_items={exp_name: c_name, exp_schema: schema}) - @pytest.mark.tags(CaseLabel.L2) def test_load_collection_after_load_partition(self): """ @@ -2420,7 +2419,38 @@ class TestLoadCollection(TestcaseBase): check_task=CheckTasks.check_query_results, check_items={'exp_res': [{'int64': 0}, {'int64': 3000}]}) - # https://github.com/milvus-io/milvus/issues/16726 + @pytest.mark.tags(CaseLabel.L3) + def test_load_replica_sq_count_balance(self): + """ + target: test load with multi replicas, and sq request load balance cross replicas + method: 1.Deploy milvus with multi querynodes + 2.Insert entities and load with replicas + 3.Do query req many times + 4.Verify the querynode sq_req_count metrics + expected: Infer whether the query request is load balanced. + """ + from utils.util_k8s import get_metrics_querynode_sq_req_count + collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + df = cf.gen_default_dataframe_data(nb=5000) + mutation_res, _ = collection_w.insert(df) + assert collection_w.num_entities == 5000 + total_sq_count = 20 + + collection_w.load(replica_number=3) + for i in range(total_sq_count): + ids = [random.randint(0, 100) for _ in range(5)] + collection_w.query(f"{ct.default_int64_field_name} in {ids}") + + replicas, _ = collection_w.get_replicas() + log.debug(replicas) + sq_req_count = get_metrics_querynode_sq_req_count() + for group in replicas.groups: + group_nodes = group.group_nodes + group_sq_req_count = 0 + for node in group_nodes: + group_sq_req_count += sq_req_count[node] + log.debug(f"Group nodes {group_nodes} with total sq_req_count {group_sq_req_count}") + @pytest.mark.tags(CaseLabel.L2) def test_get_collection_replicas_not_loaded(self): """ @@ -2435,7 +2465,8 @@ class TestLoadCollection(TestcaseBase): assert collection_w.num_entities == ct.default_nb collection_w.get_replicas(check_task=CheckTasks.err_res, - check_items={"err_code": 15, "err_msg": "getCollectionInfoByID: can't find collectionID"}) + check_items={"err_code": 15, + "err_msg": "collection not found, maybe not loaded"}) class TestReleaseAdvanced(TestcaseBase): @@ -2738,9 +2769,10 @@ class TestCollectionString(TestcaseBase): self._connect() c_name = cf.gen_unique_str(prefix) schema = cf.gen_string_pk_default_collection_schema() - self.collection_wrap.init_collection(name=c_name, schema=schema, check_task=CheckTasks.check_collection_property, + self.collection_wrap.init_collection(name=c_name, schema=schema, + check_task=CheckTasks.check_collection_property, check_items={exp_name: c_name, exp_schema: schema}) - + @pytest.mark.tags(CaseLabel.L1) def test_collection_with_muti_string_fields(self): """ @@ -2756,7 +2788,8 @@ class TestCollectionString(TestcaseBase): string_field_1 = cf.gen_string_field(is_primary=True) string_field_2 = cf.gen_string_field(name=c_name) schema = cf.gen_collection_schema(fields=[int_field, string_field_1, string_field_2, vec_field]) - self.collection_wrap.init_collection(name=c_name, schema=schema, check_task=CheckTasks.check_collection_property, + self.collection_wrap.init_collection(name=c_name, schema=schema, + check_task=CheckTasks.check_collection_property, check_items={exp_name: c_name, exp_schema: schema}) @pytest.mark.tags(CaseLabel.L1) @@ -2788,8 +2821,8 @@ class TestCollectionString(TestcaseBase): max_length = 100000 string_field = cf.gen_string_field(max_length_per_row=max_length) schema = cf.gen_collection_schema([int_field, string_field, vec_field]) - error = {ct.err_code: 0, ct.err_msg: "invalid max_length_per_row: %s" %max_length} - self.collection_wrap.init_collection(name=c_name, schema=schema, + error = {ct.err_code: 0, ct.err_msg: "invalid max_length_per_row: %s" % max_length} + self.collection_wrap.init_collection(name=c_name, schema=schema, check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) @@ -2819,13 +2852,8 @@ class TestCollectionString(TestcaseBase): int_field = cf.gen_int64_field() vec_field = cf.gen_float_vec_field() string_field = cf.gen_string_field(is_primary=True, auto_id=True) - fields=[int_field, string_field, vec_field] - schema, _=self.collection_schema_wrap.init_collection_schema(fields=fields) + fields = [int_field, string_field, vec_field] + schema, _ = self.collection_schema_wrap.init_collection_schema(fields=fields) error = {ct.err_code: 0, ct.err_msg: "autoID is not supported when the VarChar field is the primary key"} self.collection_wrap.init_collection(name=cf.gen_unique_str(prefix), schema=schema, check_task=CheckTasks.err_res, check_items=error) - - - - - diff --git a/tests/python_client/utils/util_k8s.py b/tests/python_client/utils/util_k8s.py index 52454234f4..2fb02268f6 100644 --- a/tests/python_client/utils/util_k8s.py +++ b/tests/python_client/utils/util_k8s.py @@ -1,5 +1,8 @@ +import json import os.path import time + +import requests from pymilvus import connections from kubernetes import client, config from kubernetes.client.rest import ApiException @@ -238,6 +241,32 @@ def read_pod_log(namespace, label_selector, release_name): raise Exception(str(e)) +def get_metrics_querynode_sq_req_count(): + """ get metric milvus_querynode_collection_num from prometheus""" + + PROMETHEUS = 'http://10.96.7.6:9090' + query_str = 'milvus_querynode_sq_req_count{app_kubernetes_io_instance="mic-replica",' \ + 'app_kubernetes_io_name="milvus",namespace="chaos-testing"}' + + response = requests.get(PROMETHEUS + '/api/v1/query', params={'query': query_str}) + if response.status_code == 200: + results = response.json()["data"]['result'] + # print(results) + # print(type(results)) + log.debug(json.dumps(results, indent=4)) + milvus_querynode_sq_req_count = {} + for res in results: + if res["metric"]["status"] == "total": + querynode_id = res["metric"]["node_id"] + # pod = res["metric"]["pod"] + value = res["value"][-1] + milvus_querynode_sq_req_count[int(querynode_id)] = int(value) + # log.debug(milvus_querynode_sq_req_count) + return milvus_querynode_sq_req_count + else: + raise Exception(-1, f"Failed to get metrics with status code {response.status_code}") + + if __name__ == '__main__': label = "app.kubernetes.io/name=milvus, component=querynode" instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111")