diff --git a/tests/python_client/conftest.py b/tests/python_client/conftest.py index b2caf6b4ba..558d7a9f83 100644 --- a/tests/python_client/conftest.py +++ b/tests/python_client/conftest.py @@ -58,6 +58,8 @@ def pytest_addoption(parser): parser.addoption("--tei_reranker_endpoint", action="store", default="http://text-rerank-service.milvus-ci.svc.cluster.local:80", help="tei rerank endpoint") parser.addoption("--vllm_reranker_endpoint", action="store", default="http://vllm-rerank-service.milvus-ci.svc.cluster.local:80", help="vllm rerank endpoint") + # L3 test options for alter function tests + parser.addoption("--tei_endpoint_2", action="store", default="", help="second tei embedding endpoint for alter tests") @pytest.fixture def host(request): @@ -218,6 +220,13 @@ def request_duration(request): def tei_endpoint(request): return request.config.getoption("--tei_endpoint") +@pytest.fixture +def tei_endpoint_2(request): + endpoint = request.config.getoption("--tei_endpoint_2") + if not endpoint: + pytest.skip("tei_endpoint_2 not configured") + return endpoint + @pytest.fixture def tei_reranker_endpoint(request): return request.config.getoption("--tei_reranker_endpoint") diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index 2bef287212..d6529273ca 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -28,8 +28,8 @@ pytest-parallel pytest-random-order # pymilvus -pymilvus==2.7.0rc83 -pymilvus[bulk_writer]==2.7.0rc83 +pymilvus==2.7.0rc84 +pymilvus[bulk_writer]==2.7.0rc84 # for protobuf protobuf>=5.29.5 diff --git a/tests/python_client/testcases/test_text_embedding_function_e2e.py b/tests/python_client/testcases/test_text_embedding_function_e2e.py index 723796d878..b7b7503742 100644 --- a/tests/python_client/testcases/test_text_embedding_function_e2e.py +++ b/tests/python_client/testcases/test_text_embedding_function_e2e.py @@ -841,3 +841,1221 @@ class TestHybridSearch(TestcaseBase): for i in range(nq): log.info(f"res length: {len(res_list[i])}") assert len(res_list[i]) == limit + + +@pytest.mark.tags(CaseLabel.L1) +class TestTextEmbeddingFunctionCURD(TestcaseBase): + """ + ****************************************************************** + The following cases are used to test add/alter/drop collection function APIs + ****************************************************************** + """ + + # ==================== add_collection_function positive tests ==================== + + def test_add_collection_function_text_embedding(self, tei_endpoint): + """ + target: test add text embedding function to existing collection + method: create collection without function, then add function via API + expected: function added successfully, describe shows 1 function + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + + # Verify no functions initially + res, _ = collection_w.describe() + assert len(res.get("functions", [])) == 0 + + # Create and add function + embedding_function = Function( + name="text_embedding", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + self.client.add_collection_function( + collection_name=c_name, + function=embedding_function + ) + + # Verify function is added + res, _ = collection_w.describe() + assert len(res["functions"]) == 1 + assert res["functions"][0]["name"] == "text_embedding" + + def test_add_collection_function_then_crud(self, tei_endpoint): + """ + target: test that added function works for all CRUD operations + method: create collection without function, add function, then verify insert/query/search/upsert/delete + expected: all CRUD operations work correctly with dynamically added function + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + + # Add function + embedding_function = Function( + name="text_embedding", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + self.client.add_collection_function( + collection_name=c_name, + function=embedding_function + ) + + # === INSERT === + nb = 10 + data = [{"id": i, "document": f"This is document number {i}"} for i in range(nb)] + collection_w.insert(data) + assert collection_w.num_entities == nb + + # Create index and load + index_params = { + "index_type": "AUTOINDEX", + "metric_type": "COSINE", + "params": {}, + } + collection_w.create_index("dense", index_params) + collection_w.load() + + # === QUERY === + res, _ = collection_w.query(expr="id >= 0", output_fields=["dense", "document"]) + assert len(res) == nb + for row in res: + assert len(row["dense"]) == dim + + # === SEARCH with text === + search_params = {"metric_type": "COSINE", "params": {}} + res, _ = collection_w.search( + data=["document number 5"], + anns_field="dense", + param=search_params, + limit=5, + output_fields=["document"], + ) + assert len(res) == 1 + assert len(res[0]) == 5 + + # === UPSERT - update existing record === + old_res, _ = collection_w.query(expr="id == 0", output_fields=["dense"]) + old_embedding = old_res[0]["dense"] + + upsert_data = [{"id": 0, "document": "This is a completely different updated text"}] + collection_w.upsert(upsert_data) + + new_res, _ = collection_w.query(expr="id == 0", output_fields=["dense"]) + new_embedding = new_res[0]["dense"] + + # Verify embedding changed after upsert + assert not np.allclose(old_embedding, new_embedding) + + # === UPSERT - insert new record === + upsert_new_data = [{"id": 100, "document": "This is a brand new document"}] + collection_w.upsert(upsert_new_data) + count_res, _ = collection_w.query(expr="", output_fields=["count(*)"]) + assert count_res[0]["count(*)"] == nb + 1 + + # Verify new record has vector + res, _ = collection_w.query(expr="id == 100", output_fields=["dense"]) + assert len(res) == 1 + assert len(res[0]["dense"]) == dim + + # === DELETE === + collection_w.delete("id in [1, 2, 3]") + + # Verify deleted records are not searchable + res, _ = collection_w.search( + data=["document number 1"], + anns_field="dense", + param=search_params, + limit=10, + output_fields=["id"], + ) + deleted_ids = {1, 2, 3} + for hit in res[0]: + assert hit.entity.get("id") not in deleted_ids + + # Verify count decreased + res, _ = collection_w.query(expr="id >= 0", output_fields=["id"]) + assert len(res) == nb + 1 - 3 # original + 1 upserted - 3 deleted + + def test_add_collection_function_multiple_text_embedding(self, tei_endpoint): + """ + target: test add multiple text embedding functions to different output fields + method: create collection with two vector fields, add text_embedding function to each + expected: both functions added successfully + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="title_vector", dtype=DataType.FLOAT_VECTOR, dim=dim), + FieldSchema(name="content_vector", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + + # Add text embedding function for title + title_embedding_function = Function( + name="title_embedding", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["title"], + output_field_names="title_vector", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + self.client.add_collection_function( + collection_name=c_name, + function=title_embedding_function + ) + + # Add text embedding function for content + content_embedding_function = Function( + name="content_embedding", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["content"], + output_field_names="content_vector", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + self.client.add_collection_function( + collection_name=c_name, + function=content_embedding_function + ) + + # Verify both functions are added + res, _ = collection_w.describe() + assert len(res["functions"]) == 2 + function_names = [f["name"] for f in res["functions"]] + assert "title_embedding" in function_names + assert "content_embedding" in function_names + + # Verify CRUD works with both functions + # Insert + nb = 5 + data = [{"id": i, "title": fake_en.sentence(), "content": fake_en.text()} for i in range(nb)] + collection_w.insert(data) + assert collection_w.num_entities == nb + + # Create index and load + index_params = {"index_type": "AUTOINDEX", "metric_type": "COSINE", "params": {}} + collection_w.create_index("title_vector", index_params) + collection_w.create_index("content_vector", index_params) + collection_w.load() + + # Query - verify vectors are generated + res, _ = collection_w.query(expr="id >= 0", output_fields=["title_vector", "content_vector"]) + for row in res: + assert len(row["title_vector"]) == dim + assert len(row["content_vector"]) == dim + + # Search on both vector fields + search_params = {"metric_type": "COSINE", "params": {}} + res, _ = collection_w.search( + data=[fake_en.sentence()], + anns_field="title_vector", + param=search_params, + limit=3, + ) + assert len(res[0]) == 3 + + res, _ = collection_w.search( + data=[fake_en.text()], + anns_field="content_vector", + param=search_params, + limit=3, + ) + assert len(res[0]) == 3 + + # ==================== alter_collection_function positive tests ==================== + + def test_alter_collection_function_change_endpoint(self, tei_endpoint): + """ + target: test alter function to change endpoint + method: create collection with function, alter function endpoint + expected: endpoint changed successfully + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + + text_embedding_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + schema.add_function(text_embedding_function) + + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + + # Alter function with same endpoint (just testing the API works) + new_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + self.client.alter_collection_function( + collection_name=c_name, + function_name="tei", + function=new_function + ) + + # Verify function still exists and params are correct + res, _ = collection_w.describe() + assert len(res["functions"]) == 1 + func = res["functions"][0] + assert func["name"] == "tei" + assert func["params"]["provider"] == "TEI" + assert func["params"]["endpoint"] == tei_endpoint + + def test_alter_collection_function_change_params(self, tei_endpoint): + """ + target: test alter function parameters (truncate settings) + method: create collection with function, alter truncate params + expected: params changed successfully + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + + text_embedding_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + schema.add_function(text_embedding_function) + + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + + # Alter function with new truncate params + new_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={ + "provider": "TEI", + "endpoint": tei_endpoint, + "truncate": True, + "truncation_direction": "Left" + } + ) + self.client.alter_collection_function( + collection_name=c_name, + function_name="tei", + function=new_function + ) + + # Verify function params are updated correctly + res, _ = collection_w.describe() + assert len(res["functions"]) == 1 + func = res["functions"][0] + assert func["name"] == "tei" + assert func["params"]["provider"] == "TEI" + assert func["params"]["endpoint"] == tei_endpoint + # Note: params values are returned as strings + assert func["params"]["truncate"] == "True" + assert func["params"]["truncation_direction"] == "Left" + + def test_alter_collection_function_verify_crud(self, tei_endpoint): + """ + target: test altered function works correctly for all CRUD operations + method: create collection with function, insert data, alter function, verify all CRUD operations + expected: all CRUD operations continue to work after function alteration + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + + text_embedding_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + schema.add_function(text_embedding_function) + + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + + # === INSERT before alter === + data1 = [{"id": i, "document": f"Document before alter {i}"} for i in range(5)] + collection_w.insert(data1) + + # Create index and load + index_params = { + "index_type": "AUTOINDEX", + "metric_type": "COSINE", + "params": {}, + } + collection_w.create_index("dense", index_params) + collection_w.load() + + # Get embedding before alter for comparison + res_before, _ = collection_w.query(expr="id == 0", output_fields=["dense"]) + embedding_before_alter = res_before[0]["dense"] + + # === ALTER FUNCTION === + new_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={ + "provider": "TEI", + "endpoint": tei_endpoint, + "truncate": True + } + ) + self.client.alter_collection_function( + collection_name=c_name, + function_name="tei", + function=new_function + ) + + # === INSERT after alter === + data2 = [{"id": i + 5, "document": f"Document after alter {i}"} for i in range(5)] + collection_w.insert(data2) + assert collection_w.num_entities == 10 + + # === QUERY - verify all data accessible === + res, _ = collection_w.query(expr="id >= 0", output_fields=["dense", "document"]) + assert len(res) == 10 + for row in res: + assert len(row["dense"]) == dim + + # === SEARCH with text === + search_params = {"metric_type": "COSINE", "params": {}} + res, _ = collection_w.search( + data=["Document after alter"], + anns_field="dense", + param=search_params, + limit=10, + output_fields=["document"], + ) + assert len(res[0]) == 10 + + # === UPSERT - update existing record after alter === + upsert_data = [{"id": 0, "document": "Completely new document after alter"}] + collection_w.upsert(upsert_data) + + res_after_upsert, _ = collection_w.query(expr="id == 0", output_fields=["dense"]) + embedding_after_upsert = res_after_upsert[0]["dense"] + + # Verify embedding changed + assert not np.allclose(embedding_before_alter, embedding_after_upsert) + + # === UPSERT - insert new record after alter === + upsert_new = [{"id": 100, "document": "Brand new document via upsert after alter"}] + collection_w.upsert(upsert_new) + count_res, _ = collection_w.query(expr="", output_fields=["count(*)"]) + assert count_res[0]["count(*)"] == 11 + + res, _ = collection_w.query(expr="id == 100", output_fields=["dense"]) + assert len(res[0]["dense"]) == dim + + # === DELETE after alter === + collection_w.delete("id in [1, 2]") + + # Verify deleted records not in search results + res, _ = collection_w.search( + data=["Document before alter 1"], + anns_field="dense", + param=search_params, + limit=10, + output_fields=["id"], + ) + for hit in res[0]: + assert hit.entity.get("id") not in {1, 2} + + # Verify count + res, _ = collection_w.query(expr="id >= 0", output_fields=["id"]) + assert len(res) == 9 # 10 + 1 - 2 + + # ==================== alter_collection_function L3 tests ==================== + + @pytest.mark.tags(CaseLabel.L3) + def test_alter_collection_function_change_to_different_endpoint(self, tei_endpoint, tei_endpoint_2): + """ + target: test alter function to use a different valid endpoint + method: create collection with function using endpoint1, alter to endpoint2, verify CRUD + expected: function works with new endpoint after alteration + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + + text_embedding_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + schema.add_function(text_embedding_function) + + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + + # Insert data with original endpoint + data1 = [{"id": i, "document": f"Document with endpoint1 {i}"} for i in range(3)] + collection_w.insert(data1) + + # Create index and load + index_params = {"index_type": "AUTOINDEX", "metric_type": "COSINE", "params": {}} + collection_w.create_index("dense", index_params) + collection_w.load() + + # Alter to use different endpoint + new_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint_2} + ) + self.client.alter_collection_function( + collection_name=c_name, + function_name="tei", + function=new_function + ) + + # Insert data with new endpoint + data2 = [{"id": i + 10, "document": f"Document with endpoint2 {i}"} for i in range(3)] + collection_w.insert(data2) + assert collection_w.num_entities == 6 + + # Search should work + search_params = {"metric_type": "COSINE", "params": {}} + res, _ = collection_w.search( + data=["Document with endpoint2"], + anns_field="dense", + param=search_params, + limit=6, + output_fields=["document"], + ) + assert len(res[0]) == 6 + + # Upsert should work with new endpoint + upsert_data = [{"id": 0, "document": "Updated document with new endpoint"}] + collection_w.upsert(upsert_data) + + res, _ = collection_w.query(expr="id == 0", output_fields=["document"]) + assert "Updated document" in res[0]["document"] + + # ==================== drop_collection_function positive tests ==================== + + def test_drop_collection_function_verify_crud(self, tei_endpoint): + """ + target: test CRUD behavior changes after dropping function + method: create collection with function, insert data, drop function, verify CRUD behavior + expected: after drop, insert requires manual vector, existing data still queryable/searchable + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + + text_embedding_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + schema.add_function(text_embedding_function) + + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + + # === INSERT with function (auto-generate vector) === + data_with_func = [{"id": i, "document": f"Document with function {i}"} for i in range(5)] + collection_w.insert(data_with_func) + assert collection_w.num_entities == 5 + + # Create index and load + index_params = { + "index_type": "AUTOINDEX", + "metric_type": "COSINE", + "params": {}, + } + collection_w.create_index("dense", index_params) + collection_w.load() + + # Verify vectors are generated + res, _ = collection_w.query(expr="id >= 0", output_fields=["dense"]) + for row in res: + assert len(row["dense"]) == dim + + # === DROP FUNCTION === + self.client.drop_collection_function( + collection_name=c_name, + function_name="tei" + ) + + # Verify function is removed + res, _ = collection_w.describe() + assert len(res.get("functions", [])) == 0 + + # === QUERY - existing data still accessible === + res, _ = collection_w.query(expr="id >= 0", output_fields=["dense", "document"]) + assert len(res) == 5 + for row in res: + assert len(row["dense"]) == dim + + # === SEARCH - existing data still searchable with vector === + search_params = {"metric_type": "COSINE", "params": {}} + search_vector = [[random.random() for _ in range(dim)]] + res, _ = collection_w.search( + data=search_vector, + anns_field="dense", + param=search_params, + limit=5, + output_fields=["document"], + ) + assert len(res[0]) == 5 + + # === INSERT after drop - must provide vector manually === + manual_vector = [random.random() for _ in range(dim)] + data_manual = [{"id": 10, "document": "Manual vector document", "dense": manual_vector}] + collection_w.insert(data_manual) + assert collection_w.num_entities == 6 + + # Verify manual insert succeeded + res, _ = collection_w.query(expr="id == 10", output_fields=["dense"]) + assert len(res) == 1 + assert len(res[0]["dense"]) == dim + + # === INSERT after drop without vector - should fail === + data_no_vector = [{"id": 11, "document": "No vector document"}] + collection_w.insert( + data_no_vector, + check_task=CheckTasks.err_res, + check_items={"err_code": 65535, "err_msg": ""}, + ) + + # === UPSERT after drop - must provide vector manually === + upsert_vector = [random.random() for _ in range(dim)] + upsert_data = [{"id": 0, "document": "Updated via upsert", "dense": upsert_vector}] + collection_w.upsert(upsert_data) + + res, _ = collection_w.query(expr="id == 0", output_fields=["dense"]) + # Verify vector is updated to manual one + assert np.allclose(res[0]["dense"], upsert_vector, rtol=1e-5) + + # === DELETE after drop - still works === + collection_w.delete("id in [1, 2]") + res, _ = collection_w.query(expr="id >= 0", output_fields=["id"]) + assert len(res) == 4 # 6 - 2 + + def test_drop_collection_function_one_of_multiple(self, tei_endpoint): + """ + target: test drop one function when multiple text embedding functions exist + method: create collection with two text_embedding functions, drop one, verify CRUD + expected: only specified function is dropped, other still works for CRUD + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="title_vector", dtype=DataType.FLOAT_VECTOR, dim=dim), + FieldSchema(name="content_vector", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + + # Add two text embedding functions + title_embedding = Function( + name="title_embedding", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["title"], + output_field_names="title_vector", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + schema.add_function(title_embedding) + + content_embedding = Function( + name="content_embedding", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["content"], + output_field_names="content_vector", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + schema.add_function(content_embedding) + + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + + # Verify both functions exist + res, _ = collection_w.describe() + assert len(res["functions"]) == 2 + + # === INSERT with both functions === + data = [{"id": i, "title": f"Title {i}", "content": f"Content {i}"} for i in range(3)] + collection_w.insert(data) + assert collection_w.num_entities == 3 + + # Create indexes and load + index_params = {"index_type": "AUTOINDEX", "metric_type": "COSINE", "params": {}} + collection_w.create_index("title_vector", index_params) + collection_w.create_index("content_vector", index_params) + collection_w.load() + + # Verify both vectors generated + res, _ = collection_w.query(expr="id >= 0", output_fields=["title_vector", "content_vector"]) + for row in res: + assert len(row["title_vector"]) == dim + assert len(row["content_vector"]) == dim + + # === DROP one function (title_embedding) === + self.client.drop_collection_function( + collection_name=c_name, + function_name="title_embedding" + ) + + # Verify only content_embedding remains + res, _ = collection_w.describe() + assert len(res["functions"]) == 1 + assert res["functions"][0]["name"] == "content_embedding" + + # === INSERT after drop - content_vector auto-generated, title_vector manual === + manual_title_vector = [random.random() for _ in range(dim)] + data_after_drop = [{ + "id": 10, + "title": "New title", + "content": "New content", + "title_vector": manual_title_vector + }] + collection_w.insert(data_after_drop) + assert collection_w.num_entities == 4 + + # Verify vectors + res, _ = collection_w.query(expr="id == 10", output_fields=["title_vector", "content_vector"]) + assert np.allclose(res[0]["title_vector"], manual_title_vector, rtol=1e-5) + assert len(res[0]["content_vector"]) == dim + + # === SEARCH on both fields still works === + search_params = {"metric_type": "COSINE", "params": {}} + # Search title_vector with manual vector + res, _ = collection_w.search( + data=[manual_title_vector], + anns_field="title_vector", + param=search_params, + limit=4, + ) + assert len(res[0]) == 4 + + # Search content_vector with text (function still active) + res, _ = collection_w.search( + data=["New content"], + anns_field="content_vector", + param=search_params, + limit=4, + ) + assert len(res[0]) == 4 + + # === UPSERT - content function still works === + upsert_title_vector = [random.random() for _ in range(dim)] + upsert_data = [{ + "id": 0, + "title": "Updated title", + "content": "Updated content", + "title_vector": upsert_title_vector + }] + collection_w.upsert(upsert_data) + + res, _ = collection_w.query(expr="id == 0", output_fields=["title_vector", "content_vector"]) + assert np.allclose(res[0]["title_vector"], upsert_title_vector, rtol=1e-5) + + # === DELETE still works === + collection_w.delete("id == 1") + res, _ = collection_w.query(expr="id >= 0", output_fields=["id"]) + assert len(res) == 3 + + def test_drop_collection_function_then_add_again(self, tei_endpoint): + """ + target: test can re-add function after dropping + method: create collection with function, drop it, add function again + expected: function can be re-added after drop + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + + text_embedding_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + schema.add_function(text_embedding_function) + + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + + # Drop function + self.client.drop_collection_function( + collection_name=c_name, + function_name="tei" + ) + + # Verify function is removed + res, _ = collection_w.describe() + assert len(res.get("functions", [])) == 0 + + # Add function again + new_function = Function( + name="text_embedding_v2", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + self.client.add_collection_function( + collection_name=c_name, + function=new_function + ) + + # Verify function is added + res, _ = collection_w.describe() + assert len(res["functions"]) == 1 + assert res["functions"][0]["name"] == "text_embedding_v2" + + +@pytest.mark.tags(CaseLabel.L2) +class TestTextEmbeddingFunctionCURDNegative(TestcaseBase): + """ + ****************************************************************** + The following cases are negative tests for add/alter/drop collection function APIs + ****************************************************************** + """ + + # ==================== add_collection_function negative tests ==================== + + def test_add_collection_function_nonexistent_collection(self, tei_endpoint): + """ + target: test add function to nonexistent collection + method: call add_collection_function on collection that doesn't exist + expected: error with collection not found (code=100) + """ + self._connect() + embedding_function = Function( + name="text_embedding", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + + try: + self.client.add_collection_function( + collection_name="nonexistent_collection_12345", + function=embedding_function + ) + assert False, "Expected exception for nonexistent collection" + except Exception as e: + log.info(f"Expected error: {e}") + assert e.code == 100 + assert "collection not found" in str(e) + + def test_add_collection_function_duplicate_name(self, tei_endpoint): + """ + target: test add function with duplicate name + method: create collection with function, try to add another function with same name + expected: error indicating duplicate function name (code=65535) + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + + # Add function to schema first + text_embedding_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + schema.add_function(text_embedding_function) + + c_name = cf.gen_unique_str(prefix) + self.init_collection_wrap(name=c_name, schema=schema) + + # Try to add another function with same name + duplicate_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + + try: + self.client.add_collection_function( + collection_name=c_name, + function=duplicate_function + ) + assert False, "Expected exception for duplicate function name" + except Exception as e: + log.info(f"Expected error: {e}") + assert e.code == 65535 + assert "duplicate function name" in str(e) + + def test_add_collection_function_missing_input_field(self, tei_endpoint): + """ + target: test add function with input field that doesn't exist + method: add function referencing non-existent input field + expected: error indicating input field not found (code=65535) + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + c_name = cf.gen_unique_str(prefix) + self.init_collection_wrap(name=c_name, schema=schema) + + # Create function with non-existent input field + embedding_function = Function( + name="text_embedding", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["nonexistent_field"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + + try: + self.client.add_collection_function( + collection_name=c_name, + function=embedding_function + ) + assert False, "Expected exception for missing input field" + except Exception as e: + log.info(f"Expected error: {e}") + assert e.code == 65535 + assert "function input field not found" in str(e) + + def test_add_collection_function_missing_output_field(self, tei_endpoint): + """ + target: test add function with output field that doesn't exist + method: add function referencing non-existent output field + expected: error indicating output field not found (code=65535) + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + c_name = cf.gen_unique_str(prefix) + self.init_collection_wrap(name=c_name, schema=schema) + + # Create function with non-existent output field + embedding_function = Function( + name="text_embedding", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="nonexistent_vector_field", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + + try: + self.client.add_collection_function( + collection_name=c_name, + function=embedding_function + ) + assert False, "Expected exception for missing output field" + except Exception as e: + log.info(f"Expected error: {e}") + assert e.code == 65535 + assert "function output field not found" in str(e) + + def test_add_collection_function_dim_mismatch(self, tei_endpoint): + """ + target: test add function with dimension mismatch + method: create collection with vector field dim=512, add function for model that outputs dim=768 + expected: error indicating dimension mismatch (code=65535) + """ + self._connect() + dim = 512 # Mismatched dimension (TEI model outputs 768) + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + c_name = cf.gen_unique_str(prefix) + self.init_collection_wrap(name=c_name, schema=schema) + + # Create function (model outputs 768 dim, but field is 512) + embedding_function = Function( + name="text_embedding", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + + try: + self.client.add_collection_function( + collection_name=c_name, + function=embedding_function + ) + assert False, "Expected exception for dimension mismatch" + except Exception as e: + log.info(f"Expected error: {e}") + assert e.code == 65535 + assert "embedding dim" in str(e) + + # ==================== alter_collection_function negative tests ==================== + + def test_alter_collection_function_nonexistent_collection(self, tei_endpoint): + """ + target: test alter function on nonexistent collection + method: call alter_collection_function on collection that doesn't exist + expected: error with collection not found (code=100) + """ + self._connect() + new_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + + try: + self.client.alter_collection_function( + collection_name="nonexistent_collection_12345", + function_name="tei", + function=new_function + ) + assert False, "Expected exception for nonexistent collection" + except Exception as e: + log.info(f"Expected error: {e}") + assert e.code == 100 + assert "collection not found" in str(e) + + def test_alter_collection_function_nonexistent_function(self, tei_endpoint): + """ + target: test alter function that doesn't exist + method: create collection without function, try to alter non-existent function + expected: error indicating function not found (code=65535) + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + c_name = cf.gen_unique_str(prefix) + self.init_collection_wrap(name=c_name, schema=schema) + + new_function = Function( + name="nonexistent_function", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + + try: + self.client.alter_collection_function( + collection_name=c_name, + function_name="nonexistent_function", + function=new_function + ) + assert False, "Expected exception for nonexistent function" + except Exception as e: + log.info(f"Expected error: {e}") + assert e.code == 65535 + assert "not found" in str(e) + + def test_alter_collection_function_invalid_new_endpoint(self, tei_endpoint): + """ + target: test alter function with invalid endpoint + method: create collection with valid function, alter to use invalid endpoint + expected: error indicating endpoint unreachable (code=65535) + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + + text_embedding_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": tei_endpoint} + ) + schema.add_function(text_embedding_function) + + c_name = cf.gen_unique_str(prefix) + self.init_collection_wrap(name=c_name, schema=schema) + + # Try to alter with invalid endpoint + new_function = Function( + name="tei", + function_type=FunctionType.TEXTEMBEDDING, + input_field_names=["document"], + output_field_names="dense", + params={"provider": "TEI", "endpoint": "http://invalid_endpoint_12345"} + ) + + try: + self.client.alter_collection_function( + collection_name=c_name, + function_name="tei", + function=new_function + ) + assert False, "Expected exception for invalid endpoint" + except Exception as e: + log.info(f"Expected error: {e}") + assert e.code == 65535 + assert "Check function" in str(e) and "failed" in str(e) + + # ==================== drop_collection_function negative tests ==================== + + def test_drop_collection_function_nonexistent_collection(self): + """ + target: test drop function from nonexistent collection + method: call drop_collection_function on collection that doesn't exist + expected: error with collection not found (code=100) + """ + self._connect() + + try: + self.client.drop_collection_function( + collection_name="nonexistent_collection_12345", + function_name="tei" + ) + assert False, "Expected exception for nonexistent collection" + except Exception as e: + log.info(f"Expected error: {e}") + assert e.code == 100 + assert "collection not found" in str(e) + + def test_drop_collection_function_nonexistent_function(self): + """ + target: test drop function that doesn't exist + method: create collection without function, try to drop non-existent function + expected: no error (idempotent delete behavior) + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + c_name = cf.gen_unique_str(prefix) + self.init_collection_wrap(name=c_name, schema=schema) + + # Drop nonexistent function should not raise error (idempotent) + self.client.drop_collection_function( + collection_name=c_name, + function_name="nonexistent_function" + ) + log.info("Drop nonexistent function succeeded (idempotent behavior)") + + def test_drop_collection_function_empty_name(self): + """ + target: test drop function with empty name + method: call drop_collection_function with function_name="" + expected: no error (idempotent delete behavior) + """ + self._connect() + dim = 768 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="document", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="dense", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + c_name = cf.gen_unique_str(prefix) + self.init_collection_wrap(name=c_name, schema=schema) + + # Drop with empty name should not raise error (idempotent) + self.client.drop_collection_function( + collection_name=c_name, + function_name="" + ) + log.info("Drop with empty function name succeeded (idempotent behavior)") \ No newline at end of file