mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
[test]Add option to update schema for bulk insert (#25897)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
parent
45b89cfc71
commit
fc6f7f9805
@ -7,6 +7,7 @@ def pytest_addoption(parser):
|
||||
parser.addoption("--nb", action="store", default=50000, help="nb")
|
||||
parser.addoption("--dim", action="store", default=768, help="dim")
|
||||
parser.addoption("--varchar_len", action="store", default=2000, help="varchar_len")
|
||||
parser.addoption("--with_varchar_field", action="store", default="true", help="with varchar field or not")
|
||||
|
||||
@pytest.fixture
|
||||
def file_type(request):
|
||||
@ -27,4 +28,8 @@ def dim(request):
|
||||
|
||||
@pytest.fixture
|
||||
def varchar_len(request):
|
||||
return request.config.getoption("--varchar_len")
|
||||
return request.config.getoption("--varchar_len")
|
||||
|
||||
@pytest.fixture
|
||||
def with_varchar_field(request):
|
||||
return request.config.getoption("--with_varchar_field")
|
||||
|
||||
@ -89,7 +89,7 @@ class TestChaos(TestChaosBase):
|
||||
}
|
||||
self.health_checkers = checkers
|
||||
|
||||
def prepare_bulk_insert(self, nb=3000, file_type="json", dim=768, varchar_len=2000):
|
||||
def prepare_bulk_insert(self, nb=3000, file_type="json", dim=768, varchar_len=2000, with_varchar_field=True):
|
||||
if Op.bulk_insert not in self.health_checkers:
|
||||
log.info("bulk_insert checker is not in health checkers, skip prepare bulk load")
|
||||
return
|
||||
@ -105,8 +105,8 @@ class TestChaos(TestChaosBase):
|
||||
minio_port = "9000"
|
||||
minio_endpoint = f"{minio_ip}:{minio_port}"
|
||||
bucket_name = ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"]
|
||||
schema = cf.gen_bulk_insert_collection_schema(dim=dim)
|
||||
data = cf.gen_default_list_data_for_bulk_insert(nb=nb, varchar_len=varchar_len)
|
||||
schema = cf.gen_bulk_insert_collection_schema(dim=dim, with_varchar_field=with_varchar_field)
|
||||
data = cf.gen_default_list_data_for_bulk_insert(nb=nb, varchar_len=varchar_len, with_varchar_field=with_varchar_field)
|
||||
data_dir = "/tmp/bulk_insert_data"
|
||||
Path(data_dir).mkdir(parents=True, exist_ok=True)
|
||||
files = []
|
||||
@ -125,15 +125,20 @@ class TestChaos(TestChaosBase):
|
||||
log.info("prepare data for bulk load done")
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
def test_bulk_insert_perf(self, file_type, nb, dim, varchar_len):
|
||||
def test_bulk_insert_perf(self, file_type, nb, dim, varchar_len, with_varchar_field):
|
||||
# start the monitor threads to check the milvus ops
|
||||
log.info("*********************Test Start**********************")
|
||||
log.info(connections.get_connection_addr('default'))
|
||||
log.info(f"file_type: {file_type}, nb: {nb}, dim: {dim}, varchar_len: {varchar_len}")
|
||||
log.info(f"file_type: {file_type}, nb: {nb}, dim: {dim}, varchar_len: {varchar_len}, with_varchar_field: {with_varchar_field}")
|
||||
self.init_health_checkers(dim=int(dim))
|
||||
nb = int(nb)
|
||||
if str(with_varchar_field) in ["true", "True"]:
|
||||
with_varchar_field = True
|
||||
else:
|
||||
with_varchar_field = False
|
||||
varchar_len = int(varchar_len)
|
||||
self.prepare_bulk_insert(file_type=file_type, nb=nb, dim=int(dim), varchar_len=varchar_len)
|
||||
|
||||
self.prepare_bulk_insert(file_type=file_type, nb=nb, dim=int(dim), varchar_len=varchar_len, with_varchar_field=with_varchar_field)
|
||||
cc.start_monitor_threads(self.health_checkers)
|
||||
# wait 600s
|
||||
while self.health_checkers[Op.bulk_insert].total() <= 10:
|
||||
|
||||
@ -165,7 +165,7 @@ def gen_default_collection_schema(description=ct.default_desc, primary_field=ct.
|
||||
return schema
|
||||
|
||||
|
||||
def gen_bulk_insert_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name,
|
||||
def gen_bulk_insert_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name, with_varchar_field=True,
|
||||
auto_id=False, dim=ct.default_dim, enable_dynamic_field=False, with_json=False):
|
||||
if enable_dynamic_field:
|
||||
if primary_field is ct.default_int64_field_name:
|
||||
@ -180,6 +180,8 @@ def gen_bulk_insert_collection_schema(description=ct.default_desc, primary_field
|
||||
gen_float_vec_field(dim=dim)]
|
||||
if with_json is False:
|
||||
fields.remove(gen_json_field())
|
||||
if with_varchar_field is False:
|
||||
fields.remove(gen_string_field())
|
||||
schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description,
|
||||
primary_field=primary_field, auto_id=auto_id,
|
||||
enable_dynamic_field=enable_dynamic_field)
|
||||
@ -478,13 +480,15 @@ def gen_default_list_data(nb=ct.default_nb, dim=ct.default_dim, start=0, with_js
|
||||
return data
|
||||
|
||||
|
||||
def gen_default_list_data_for_bulk_insert(nb=ct.default_nb, varchar_len=2000):
|
||||
def gen_default_list_data_for_bulk_insert(nb=ct.default_nb, varchar_len=2000, with_varchar_field=True):
|
||||
str_value = gen_str_by_length(length=varchar_len)
|
||||
int_values = [i for i in range(nb)]
|
||||
float_values = [np.float32(i) for i in range(nb)]
|
||||
string_values = [f"{str(i)}_{str_value}" for i in range(nb)]
|
||||
float_vec_values = [] # placeholder for float_vec
|
||||
data = [int_values, float_values, string_values, float_vec_values]
|
||||
if with_varchar_field is False:
|
||||
data = [int_values, float_values, float_vec_values]
|
||||
return data
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user