From 037a58a60dab62521b643ea476e028efb486ec29 Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Tue, 1 Aug 2023 09:37:04 +0800 Subject: [PATCH] [test]Enable standby during rolling update and refine bulk insert (#26039) Signed-off-by: zhuwenxing --- tests/python_client/chaos/checker.py | 26 ++++++++++++------- .../chaos/testcases/test_get_collections.py | 5 +++- ...le_request_operation_for_rolling_update.py | 24 ++++++++++++----- tests/python_client/deploy/milvus_crd.yaml | 14 ++++++++-- 4 files changed, 51 insertions(+), 18 deletions(-) diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index f8246b4c7f..61193f587f 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -8,6 +8,7 @@ from base.collection_wrapper import ApiCollectionWrapper from base.utility_wrapper import ApiUtilityWrapper from common import common_func as cf from common import common_type as ct +from common.milvus_sys import MilvusSys from chaos import constants from common.common_type import CheckTasks @@ -30,7 +31,7 @@ class Op(Enum): unknown = 'unknown' -timeout = 120 +timeout = 10 enable_traceback = False DEFAULT_FMT = '[start time:{start_time}][time cost:{elapsed:0.8f}s][operation_name:{operation_name}][collection name:{collection_name}] -> {result!r}' @@ -103,6 +104,8 @@ class Checker: self.rsp_times = [] self.average_time = 0 self.files = [] + self.ms = MilvusSys() + self.bucket_name = self.ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"] self.c_wrap = ApiCollectionWrapper() self.utility_wrap = ApiUtilityWrapper() c_name = collection_name if collection_name is not None else cf.gen_unique_str( @@ -180,16 +183,21 @@ class Checker: nb=constants.ENTITIES_FOR_BULKINSERT, file_type="npy", minio_endpoint="127.0.0.1:9000", - bucket_name="milvus-bucket"): + bucket_name=None): schema = self.schema + bucket_name = self.bucket_name if bucket_name is None else bucket_name log.info(f"prepare data for bulk insert") - files = cf.prepare_bulk_insert_data(schema=schema, - nb=nb, - file_type=file_type, - minio_endpoint=minio_endpoint, - bucket_name=bucket_name) - self.files = files - return files + try: + files = cf.prepare_bulk_insert_data(schema=schema, + nb=nb, + file_type=file_type, + minio_endpoint=minio_endpoint, + bucket_name=bucket_name) + self.files = files + return files, True + except Exception as e: + log.error(f"prepare data for bulk insert failed with error {e}") + return [], False def do_bulk_insert(self): log.info(f"bulk insert collection name: {self.c_name}") diff --git a/tests/python_client/chaos/testcases/test_get_collections.py b/tests/python_client/chaos/testcases/test_get_collections.py index 9fbfb5bca3..62c6a5f1bf 100644 --- a/tests/python_client/chaos/testcases/test_get_collections.py +++ b/tests/python_client/chaos/testcases/test_get_collections.py @@ -2,9 +2,10 @@ import time import json from collections import defaultdict import pytest - +from pymilvus import Collection from base.client_base import TestcaseBase from deploy.common import get_chaos_test_collections +from chaos import constants from common.common_type import CaseLabel from utils.util_log import test_log as log @@ -19,6 +20,8 @@ class TestGetCollections(TestcaseBase): all_collections = [c_name for c_name in all_collections if "Checker" in c_name] selected_collections_map = {} for c_name in all_collections: + if Collection(name=c_name).num_entities < constants.ENTITIES_FOR_SEARCH: + continue prefix = c_name.split("_")[0] if prefix not in selected_collections_map: selected_collections_map[prefix] = [c_name] diff --git a/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py b/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py index 6b0a0fcc14..24db92052c 100644 --- a/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py +++ b/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py @@ -75,13 +75,22 @@ class TestOperations(TestBase): self.init_health_checkers(collection_name=c_name) # prepare data by bulk insert log.info("*********************Prepare Data by bulk insert**********************") - - cc.start_monitor_threads(self.health_checkers) for k, v in self.health_checkers.items(): - log.info(f"prepare bulk insert data for {k}") - v.prepare_bulk_insert_data(minio_endpoint=self.minio_endpoint) - v.do_bulk_insert() + if k in [Op.search, Op.query]: + log.info(f"prepare bulk insert data for {k}") + v.prepare_bulk_insert_data(minio_endpoint=self.minio_endpoint) + completed = False + retry_times = 0 + while not completed and retry_times < 3: + completed, result = v.do_bulk_insert() + if not completed: + log.info(f"do bulk insert failed: {result}") + retry_times += 1 + sleep(5) + # how to make sure the bulk insert done before rolling update? + log.info("*********************Load Start**********************") + cc.start_monitor_threads(self.health_checkers) # wait request_duration request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") if request_duration[-1] == "+": @@ -95,8 +104,11 @@ class TestOperations(TestBase): v.pause() for k, v in self.health_checkers.items(): v.check_result() - for k, v in self.health_checkers.items(): + for k, v in self.health_checkers.items(): + log.info(f"{k} failed request: {v.fail_records}") + for k, v in self.health_checkers.items(): log.info(f"{k} rto: {v.get_rto()}") + if is_check: assert_statistic(self.health_checkers, succ_rate_threshold=0.98) assert_expectations() diff --git a/tests/python_client/deploy/milvus_crd.yaml b/tests/python_client/deploy/milvus_crd.yaml index 4cfea5fbbf..26b7e6d5f8 100644 --- a/tests/python_client/deploy/milvus_crd.yaml +++ b/tests/python_client/deploy/milvus_crd.yaml @@ -2,7 +2,7 @@ apiVersion: milvus.io/v1beta1 kind: Milvus metadata: - name: kafka-demo + name: operator-demo namespace: chaos-testing labels: app: milvus @@ -12,6 +12,16 @@ spec: dataNode: memory: forceSyncEnable: false + rootCoord: + enableActiveStandby: true + dataCoord: + enableActiveStandby: true + queryCoord: + enableActiveStandby: true + indexCoord: + enableActiveStandby: true +# mixCoord: +# enableActiveStandby: true quotaAndLimits: enable: false log: @@ -28,7 +38,7 @@ spec: queryNode: replicas: 2 mixCoord: - replicas: 1 + replicas: 1 dependencies: msgStreamType: kafka etcd: