diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 712ab23851..ddb37dbcad 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -390,7 +390,7 @@ class Checker: enable_dynamic_field = kwargs.get("enable_dynamic_field", True) schema = cf.gen_all_datatype_collection_schema(dim=dim, enable_struct_array_field=enable_struct_array_field, enable_dynamic_field=enable_dynamic_field) if schema is None else schema - log.info(f"schema: {schema}") + log.debug(f"schema: {schema}") self.schema = schema self.dim = cf.get_dim_by_schema(schema=schema) self.int64_field_name = cf.get_int64_field_name(schema=schema) @@ -601,8 +601,7 @@ class Checker: def insert_data(self, nb=constants.DELTA_PER_INS, partition_name=None): partition_name = self.p_name if partition_name is None else partition_name - client_schema = self.milvus_client.describe_collection(collection_name=self.c_name) - data = cf.gen_row_data_by_schema(nb=nb, schema=client_schema) + data = cf.gen_row_data_by_schema(nb=nb, schema=self.get_schema()) ts_data = [] for i in range(nb): time.sleep(0.001) @@ -807,7 +806,7 @@ class CollectionRenameChecker(Checker): result = self.milvus_client.has_collection(collection_name=new_collection_name) if result: self.c_name = new_collection_name - data = cf.gen_row_data_by_schema(nb=1, schema=self.schema) + data = cf.gen_row_data_by_schema(nb=1, schema=self.get_schema()) self.milvus_client.insert(collection_name=new_collection_name, data=data) return res, result @@ -1104,7 +1103,7 @@ class InsertFlushChecker(Checker): try: self.milvus_client.insert( collection_name=self.c_name, - data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), + data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.get_schema()), timeout=timeout ) insert_result = True @@ -1162,14 +1161,14 @@ class FlushChecker(Checker): try: self.milvus_client.insert( collection_name=self.c_name, - data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), + data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.get_schema()), timeout=timeout ) - result = True - except Exception: - result = False - res, result = self.flush() - return res, result + res, result = self.flush() + return res, result + except Exception as e: + log.error(f"run task error: {e}") + return str(e), False def keep_running(self): while self._keep_running: @@ -1239,9 +1238,7 @@ class InsertChecker(Checker): @trace() def insert_entities(self): - # Use describe_collection directly to preserve struct_fields information - schema = self.milvus_client.describe_collection(self.c_name) - data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema) + data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema()) rows = len(data) ts_data = [] for i in range(constants.DELTA_PER_INS): @@ -1327,8 +1324,7 @@ class InsertFreshnessChecker(Checker): self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet" def insert_entities(self): - schema = self.get_schema() - data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema) + data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema()) ts_data = [] for i in range(constants.DELTA_PER_INS): time.sleep(0.001) @@ -1386,8 +1382,7 @@ class UpsertChecker(Checker): if collection_name is None: collection_name = cf.gen_unique_str("UpsertChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) - schema = self.milvus_client.describe_collection(collection_name=self.c_name) - self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema) + self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema()) @trace() def upsert_entities(self): @@ -1405,8 +1400,7 @@ class UpsertChecker(Checker): # half of the data is upsert, the other half is insert rows = len(self.data) pk_old = [d[self.int64_field_name] for d in self.data[:rows // 2]] - schema = self.milvus_client.describe_collection(collection_name=self.c_name) - self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema) + self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema()) pk_new = [d[self.int64_field_name] for d in self.data[rows // 2:]] pk_update = pk_old + pk_new for i in range(rows): @@ -1429,8 +1423,7 @@ class UpsertFreshnessChecker(Checker): if collection_name is None: collection_name = cf.gen_unique_str("UpsertChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) - schema = self.get_schema() - self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema) + self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema()) def upsert_entities(self): try: @@ -1463,8 +1456,7 @@ class UpsertFreshnessChecker(Checker): # half of the data is upsert, the other half is insert rows = len(self.data[0]) pk_old = self.data[0][:rows // 2] - schema = self.get_schema() - self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema) + self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema()) pk_new = self.data[0][rows // 2:] pk_update = pk_old + pk_new self.data[0] = pk_update @@ -1486,8 +1478,7 @@ class PartialUpdateChecker(Checker): if collection_name is None: collection_name = cf.gen_unique_str("PartialUpdateChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, enable_struct_array_field=False) - schema = self.get_schema() - self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema) + self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema()) @trace() def partial_update_entities(self): @@ -1772,7 +1763,7 @@ class IndexCreateChecker(Checker): super().__init__(collection_name=collection_name, schema=schema) for i in range(5): self.milvus_client.insert(collection_name=self.c_name, - data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), + data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.get_schema()), timeout=timeout) # do as a flush before indexing stats = self.milvus_client.get_collection_stats(collection_name=self.c_name) @@ -1812,7 +1803,7 @@ class IndexDropChecker(Checker): collection_name = cf.gen_unique_str("IndexChecker_") super().__init__(collection_name=collection_name, schema=schema) for i in range(5): - self.milvus_client.insert(collection_name=self.c_name, data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), + self.milvus_client.insert(collection_name=self.c_name, data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.get_schema()), timeout=timeout) # do as a flush before indexing stats = self.milvus_client.get_collection_stats(collection_name=self.c_name) @@ -2272,7 +2263,7 @@ class BulkInsertChecker(Checker): ) as remote_writer: for _ in range(data_size): - row = cf.gen_row_data_by_schema(nb=1, schema=self.schema)[0] + row = cf.gen_row_data_by_schema(nb=1, schema=self.get_schema())[0] remote_writer.append_row(row) remote_writer.commit() batch_files = remote_writer.batch_files