diff --git a/internal/master/segment_manager.go b/internal/master/segment_manager.go index e7a4b64dce..63ef0a11b6 100644 --- a/internal/master/segment_manager.go +++ b/internal/master/segment_manager.go @@ -297,7 +297,8 @@ func (manager *SegmentManagerImpl) syncWriteNodeTimestamp(timeTick Timestamp) er manager.mu.Lock() defer manager.mu.Unlock() for _, status := range manager.collStatus { - for i, segStatus := range status.segments { + for i := 0; i < len(status.segments); i++ { + segStatus := status.segments[i] if !segStatus.closable { closable, err := manager.judgeSegmentClosable(segStatus) if err != nil { @@ -318,6 +319,7 @@ func (manager *SegmentManagerImpl) syncWriteNodeTimestamp(timeTick Timestamp) er continue } status.segments = append(status.segments[:i], status.segments[i+1:]...) + i-- ts, err := manager.globalTSOAllocator() if err != nil { log.Printf("allocate tso error: %s", err.Error()) diff --git a/internal/writenode/data_sync_service_test.go b/internal/writenode/data_sync_service_test.go index 4825780360..df82cec4d9 100644 --- a/internal/writenode/data_sync_service_test.go +++ b/internal/writenode/data_sync_service_test.go @@ -236,7 +236,7 @@ func newMeta() { }, { FieldID: 0, - Name: "RowID", + Name: "RawID", Description: "test collection filed 1", DataType: schemapb.DataType_INT64, TypeParams: []*commonpb.KeyValuePair{ diff --git a/internal/writenode/flush_sync_service_test.go b/internal/writenode/flush_sync_service_test.go index 7da80503d6..4c62cfcacf 100644 --- a/internal/writenode/flush_sync_service_test.go +++ b/internal/writenode/flush_sync_service_test.go @@ -89,12 +89,6 @@ func TestFlushSyncService_Start(t *testing.T) { time.Sleep(time.Millisecond * 50) } - for { - if len(ddChan) == 0 && len(insertChan) == 0 { - break - } - } - ret, err := fService.metaTable.getSegBinlogPaths(SegID) assert.NoError(t, err) assert.Equal(t, map[int64][]string{ diff --git a/tests/python/requirements.txt b/tests/python/requirements.txt index 9bee462aee..efb68f38fc 100644 --- a/tests/python/requirements.txt +++ b/tests/python/requirements.txt @@ -4,5 +4,5 @@ numpy==1.18.1 pytest==5.3.4 pytest-cov==2.8.1 pytest-timeout==1.3.4 -pymilvus-distributed==0.0.10 +pymilvus-distributed==0.0.9 sklearn==0.0 diff --git a/tests/python/test_index.py b/tests/python/test_index.py deleted file mode 100644 index 687e6573a6..0000000000 --- a/tests/python/test_index.py +++ /dev/null @@ -1,824 +0,0 @@ -import logging -import time -import pdb -import threading -from multiprocessing import Pool, Process -import numpy -import pytest -import sklearn.preprocessing -from .utils import * -from .constants import * - -uid = "test_index" -BUILD_TIMEOUT = 300 -field_name = default_float_vec_field_name -binary_field_name = default_binary_vec_field_name -query, query_vecs = gen_query_vectors(field_name, default_entities, default_top_k, 1) -default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"} - - -@pytest.mark.skip("wait for debugging...") -class TestIndexBase: - @pytest.fixture( - scope="function", - params=gen_simple_index() - ) - def get_simple_index(self, request, connect): - logging.getLogger().info(request.param) - # TODO: Determine the service mode - # if str(connect._cmd("mode")) == "CPU": - if request.param["index_type"] in index_cpu_not_support(): - pytest.skip("sq8h not support in CPU mode") - return request.param - - @pytest.fixture( - scope="function", - params=[ - 1, - 10, - 1111 - ], - ) - def get_nq(self, request): - yield request.param - - """ - ****************************************************************** - The following cases are used to test `create_index` function - ****************************************************************** - """ - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection and add entities in it, create index - expected: return search success - ''' - ids = connect.bulk_insert(collection, default_entities) - connect.create_index(collection, field_name, get_simple_index) - - def test_create_index_on_field_not_existed(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection and add entities in it, create index on field not existed - expected: error raised - ''' - tmp_field_name = gen_unique_str() - ids = connect.bulk_insert(collection, default_entities) - with pytest.raises(Exception) as e: - connect.create_index(collection, tmp_field_name, get_simple_index) - - @pytest.mark.level(2) - def test_create_index_on_field(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection and add entities in it, create index on other field - expected: error raised - ''' - tmp_field_name = "int64" - ids = connect.bulk_insert(collection, default_entities) - with pytest.raises(Exception) as e: - connect.create_index(collection, tmp_field_name, get_simple_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_no_vectors(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection and add entities in it, create index - expected: return search success - ''' - connect.create_index(collection, field_name, get_simple_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_partition(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection, create partition, and add entities in it, create index - expected: return search success - ''' - connect.create_partition(collection, default_tag) - ids = connect.bulk_insert(collection, default_entities, partition_tag=default_tag) - connect.flush([collection]) - connect.create_index(collection, field_name, get_simple_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_partition_flush(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection, create partition, and add entities in it, create index - expected: return search success - ''' - connect.create_partition(collection, default_tag) - ids = connect.bulk_insert(collection, default_entities, partition_tag=default_tag) - connect.flush() - connect.create_index(collection, field_name, get_simple_index) - - def test_create_index_without_connect(self, dis_connect, collection): - ''' - target: test create index without connection - method: create collection and add entities in it, check if added successfully - expected: raise exception - ''' - with pytest.raises(Exception) as e: - dis_connect.create_index(collection, field_name, get_simple_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_search_with_query_vectors(self, connect, collection, get_simple_index, get_nq): - ''' - target: test create index interface, search with more query vectors - method: create collection and add entities in it, create index - expected: return search success - ''' - ids = connect.bulk_insert(collection, default_entities) - connect.create_index(collection, field_name, get_simple_index) - logging.getLogger().info(connect.get_collection_stats(collection)) - nq = get_nq - index_type = get_simple_index["index_type"] - search_param = get_search_param(index_type) - query, vecs = gen_query_vectors(field_name, default_entities, default_top_k, nq, search_params=search_param) - res = connect.search(collection, query) - assert len(res) == nq - - @pytest.mark.timeout(BUILD_TIMEOUT) - @pytest.mark.level(2) - def test_create_index_multithread(self, connect, collection, args): - ''' - target: test create index interface with multiprocess - method: create collection and add entities in it, create index - expected: return search success - ''' - connect.bulk_insert(collection, default_entities) - - def build(connect): - connect.create_index(collection, field_name, default_index) - - threads_num = 8 - threads = [] - for i in range(threads_num): - m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) - t = MilvusTestThread(target=build, args=(m,)) - threads.append(t) - t.start() - time.sleep(0.2) - for t in threads: - t.join() - - def test_create_index_collection_not_existed(self, connect): - ''' - target: test create index interface when collection name not existed - method: create collection and add entities in it, create index - , make sure the collection name not in index - expected: create index failed - ''' - collection_name = gen_unique_str(uid) - with pytest.raises(Exception) as e: - connect.create_index(collection_name, field_name, default_index) - - @pytest.mark.level(2) - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_insert_flush(self, connect, collection, get_simple_index): - ''' - target: test create index - method: create collection and create index, add entities in it - expected: create index ok, and count correct - ''' - connect.create_index(collection, field_name, get_simple_index) - ids = connect.bulk_insert(collection, default_entities) - connect.flush([collection]) - count = connect.count_entities(collection) - assert count == default_nb - - @pytest.mark.level(2) - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_same_index_repeatedly(self, connect, collection, get_simple_index): - ''' - target: check if index can be created repeatedly, with the same create_index params - method: create index after index have been built - expected: return code success, and search ok - ''' - connect.create_index(collection, field_name, get_simple_index) - connect.create_index(collection, field_name, get_simple_index) - - # TODO: - @pytest.mark.level(2) - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_different_index_repeatedly(self, connect, collection): - ''' - target: check if index can be created repeatedly, with the different create_index params - method: create another index with different index_params after index have been built - expected: return code 0, and describe index result equals with the second index params - ''' - ids = connect.bulk_insert(collection, default_entities) - indexs = [default_index, {"metric_type":"L2", "index_type": "FLAT", "params":{"nlist": 1024}}] - for index in indexs: - connect.create_index(collection, field_name, index) - stats = connect.get_collection_stats(collection) - # assert stats["partitions"][0]["segments"][0]["index_name"] == index["index_type"] - assert stats["row_count"] == default_nb - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_ip(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection and add entities in it, create index - expected: return search success - ''' - ids = connect.bulk_insert(collection, default_entities) - get_simple_index["metric_type"] = "IP" - connect.create_index(collection, field_name, get_simple_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_no_vectors_ip(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection and add entities in it, create index - expected: return search success - ''' - get_simple_index["metric_type"] = "IP" - connect.create_index(collection, field_name, get_simple_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_partition_ip(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection, create partition, and add entities in it, create index - expected: return search success - ''' - connect.create_partition(collection, default_tag) - ids = connect.bulk_insert(collection, default_entities, partition_tag=default_tag) - connect.flush([collection]) - get_simple_index["metric_type"] = "IP" - connect.create_index(collection, field_name, get_simple_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_partition_flush_ip(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection, create partition, and add entities in it, create index - expected: return search success - ''' - connect.create_partition(collection, default_tag) - ids = connect.bulk_insert(collection, default_entities, partition_tag=default_tag) - connect.flush() - get_simple_index["metric_type"] = "IP" - connect.create_index(collection, field_name, get_simple_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_search_with_query_vectors_ip(self, connect, collection, get_simple_index, get_nq): - ''' - target: test create index interface, search with more query vectors - method: create collection and add entities in it, create index - expected: return search success - ''' - metric_type = "IP" - ids = connect.bulk_insert(collection, default_entities) - get_simple_index["metric_type"] = metric_type - connect.create_index(collection, field_name, get_simple_index) - logging.getLogger().info(connect.get_collection_stats(collection)) - nq = get_nq - index_type = get_simple_index["index_type"] - search_param = get_search_param(index_type) - query, vecs = gen_query_vectors(field_name, default_entities, default_top_k, nq, metric_type=metric_type, search_params=search_param) - res = connect.search(collection, query) - assert len(res) == nq - - @pytest.mark.timeout(BUILD_TIMEOUT) - @pytest.mark.level(2) - def test_create_index_multithread_ip(self, connect, collection, args): - ''' - target: test create index interface with multiprocess - method: create collection and add entities in it, create index - expected: return search success - ''' - connect.bulk_insert(collection, default_entities) - - def build(connect): - default_index["metric_type"] = "IP" - connect.create_index(collection, field_name, default_index) - - threads_num = 8 - threads = [] - for i in range(threads_num): - m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) - t = MilvusTestThread(target=build, args=(m,)) - threads.append(t) - t.start() - time.sleep(0.2) - for t in threads: - t.join() - - def test_create_index_collection_not_existed_ip(self, connect, collection): - ''' - target: test create index interface when collection name not existed - method: create collection and add entities in it, create index - , make sure the collection name not in index - expected: return code not equals to 0, create index failed - ''' - collection_name = gen_unique_str(uid) - default_index["metric_type"] = "IP" - with pytest.raises(Exception) as e: - connect.create_index(collection_name, field_name, default_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_no_vectors_insert_ip(self, connect, collection, get_simple_index): - ''' - target: test create index interface when there is no vectors in collection, and does not affect the subsequent process - method: create collection and add no vectors in it, and then create index, add entities in it - expected: return code equals to 0 - ''' - default_index["metric_type"] = "IP" - connect.create_index(collection, field_name, get_simple_index) - ids = connect.bulk_insert(collection, default_entities) - connect.flush([collection]) - count = connect.count_entities(collection) - assert count == default_nb - - @pytest.mark.level(2) - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_same_index_repeatedly_ip(self, connect, collection, get_simple_index): - ''' - target: check if index can be created repeatedly, with the same create_index params - method: create index after index have been built - expected: return code success, and search ok - ''' - default_index["metric_type"] = "IP" - connect.create_index(collection, field_name, get_simple_index) - connect.create_index(collection, field_name, get_simple_index) - - # TODO: - @pytest.mark.level(2) - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_different_index_repeatedly_ip(self, connect, collection): - ''' - target: check if index can be created repeatedly, with the different create_index params - method: create another index with different index_params after index have been built - expected: return code 0, and describe index result equals with the second index params - ''' - ids = connect.bulk_insert(collection, default_entities) - indexs = [default_index, {"index_type": "FLAT", "params": {"nlist": 1024}, "metric_type": "IP"}] - for index in indexs: - connect.create_index(collection, field_name, index) - stats = connect.get_collection_stats(collection) - # assert stats["partitions"][0]["segments"][0]["index_name"] == index["index_type"] - assert stats["row_count"] == default_nb - - """ - ****************************************************************** - The following cases are used to test `drop_index` function - ****************************************************************** - """ - - def test_drop_index(self, connect, collection, get_simple_index): - ''' - target: test drop index interface - method: create collection and add entities in it, create index, call drop index - expected: return code 0, and default index param - ''' - # ids = connect.bulk_insert(collection, entities) - connect.create_index(collection, field_name, get_simple_index) - connect.drop_index(collection, field_name) - stats = connect.get_collection_stats(collection) - # assert stats["partitions"][0]["segments"][0]["index_name"] == default_index_type - assert not stats["partitions"][0]["segments"] - - @pytest.mark.level(2) - def test_drop_index_repeatly(self, connect, collection, get_simple_index): - ''' - target: test drop index repeatly - method: create index, call drop index, and drop again - expected: return code 0 - ''' - connect.create_index(collection, field_name, get_simple_index) - stats = connect.get_collection_stats(collection) - connect.drop_index(collection, field_name) - connect.drop_index(collection, field_name) - stats = connect.get_collection_stats(collection) - logging.getLogger().info(stats) - # assert stats["partitions"][0]["segments"][0]["index_name"] == default_index_type - assert not stats["partitions"][0]["segments"] - - @pytest.mark.level(2) - def test_drop_index_without_connect(self, dis_connect, collection): - ''' - target: test drop index without connection - method: drop index, and check if drop successfully - expected: raise exception - ''' - with pytest.raises(Exception) as e: - dis_connect.drop_index(collection, field_name) - - def test_drop_index_collection_not_existed(self, connect): - ''' - target: test drop index interface when collection name not existed - method: create collection and add entities in it, create index - , make sure the collection name not in index, and then drop it - expected: return code not equals to 0, drop index failed - ''' - collection_name = gen_unique_str(uid) - with pytest.raises(Exception) as e: - connect.drop_index(collection_name, field_name) - - def test_drop_index_collection_not_create(self, connect, collection): - ''' - target: test drop index interface when index not created - method: create collection and add entities in it, create index - expected: return code not equals to 0, drop index failed - ''' - # ids = connect.bulk_insert(collection, entities) - # no create index - connect.drop_index(collection, field_name) - - @pytest.mark.level(2) - def test_create_drop_index_repeatly(self, connect, collection, get_simple_index): - ''' - target: test create / drop index repeatly, use the same index params - method: create index, drop index, four times - expected: return code 0 - ''' - for i in range(4): - connect.create_index(collection, field_name, get_simple_index) - connect.drop_index(collection, field_name) - - def test_drop_index_ip(self, connect, collection, get_simple_index): - ''' - target: test drop index interface - method: create collection and add entities in it, create index, call drop index - expected: return code 0, and default index param - ''' - # ids = connect.bulk_insert(collection, entities) - get_simple_index["metric_type"] = "IP" - connect.create_index(collection, field_name, get_simple_index) - connect.drop_index(collection, field_name) - stats = connect.get_collection_stats(collection) - # assert stats["partitions"][0]["segments"][0]["index_name"] == default_index_type - assert not stats["partitions"][0]["segments"] - - @pytest.mark.level(2) - def test_drop_index_repeatly_ip(self, connect, collection, get_simple_index): - ''' - target: test drop index repeatly - method: create index, call drop index, and drop again - expected: return code 0 - ''' - get_simple_index["metric_type"] = "IP" - connect.create_index(collection, field_name, get_simple_index) - stats = connect.get_collection_stats(collection) - connect.drop_index(collection, field_name) - connect.drop_index(collection, field_name) - stats = connect.get_collection_stats(collection) - logging.getLogger().info(stats) - # assert stats["partitions"][0]["segments"][0]["index_name"] == default_index_type - assert not stats["partitions"][0]["segments"] - - @pytest.mark.level(2) - def test_drop_index_without_connect_ip(self, dis_connect, collection): - ''' - target: test drop index without connection - method: drop index, and check if drop successfully - expected: raise exception - ''' - with pytest.raises(Exception) as e: - dis_connect.drop_index(collection, field_name) - - def test_drop_index_collection_not_create_ip(self, connect, collection): - ''' - target: test drop index interface when index not created - method: create collection and add entities in it, create index - expected: return code not equals to 0, drop index failed - ''' - # ids = connect.bulk_insert(collection, entities) - # no create index - connect.drop_index(collection, field_name) - - @pytest.mark.level(2) - def test_create_drop_index_repeatly_ip(self, connect, collection, get_simple_index): - ''' - target: test create / drop index repeatly, use the same index params - method: create index, drop index, four times - expected: return code 0 - ''' - get_simple_index["metric_type"] = "IP" - for i in range(4): - connect.create_index(collection, field_name, get_simple_index) - connect.drop_index(collection, field_name) - - -@pytest.mark.skip("binary") -class TestIndexBinary: - @pytest.fixture( - scope="function", - params=gen_simple_index() - ) - def get_simple_index(self, request, connect): - # TODO: Determine the service mode - # if str(connect._cmd("mode")) == "CPU": - if request.param["index_type"] in index_cpu_not_support(): - pytest.skip("sq8h not support in CPU mode") - return request.param - - @pytest.fixture( - scope="function", - params=gen_binary_index() - ) - def get_jaccard_index(self, request, connect): - if request.param["index_type"] in binary_support(): - request.param["metric_type"] = "JACCARD" - return request.param - else: - pytest.skip("Skip index") - - @pytest.fixture( - scope="function", - params=gen_binary_index() - ) - def get_l2_index(self, request, connect): - request.param["metric_type"] = "L2" - return request.param - - @pytest.fixture( - scope="function", - params=[ - 1, - 10, - 1111 - ], - ) - def get_nq(self, request): - yield request.param - - """ - ****************************************************************** - The following cases are used to test `create_index` function - ****************************************************************** - """ - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index(self, connect, binary_collection, get_jaccard_index): - ''' - target: test create index interface - method: create collection and add entities in it, create index - expected: return search success - ''' - ids = connect.bulk_insert(binary_collection, default_binary_entities) - connect.create_index(binary_collection, binary_field_name, get_jaccard_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_partition(self, connect, binary_collection, get_jaccard_index): - ''' - target: test create index interface - method: create collection, create partition, and add entities in it, create index - expected: return search success - ''' - connect.create_partition(binary_collection, default_tag) - ids = connect.bulk_insert(binary_collection, default_binary_entities, partition_tag=default_tag) - connect.create_index(binary_collection, binary_field_name, get_jaccard_index) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_search_with_query_vectors(self, connect, binary_collection, get_jaccard_index, get_nq): - ''' - target: test create index interface, search with more query vectors - method: create collection and add entities in it, create index - expected: return search success - ''' - nq = get_nq - ids = connect.bulk_insert(binary_collection, default_binary_entities) - connect.create_index(binary_collection, binary_field_name, get_jaccard_index) - query, vecs = gen_query_vectors(binary_field_name, default_binary_entities, default_top_k, nq, metric_type="JACCARD") - search_param = get_search_param(get_jaccard_index["index_type"], metric_type="JACCARD") - logging.getLogger().info(search_param) - res = connect.search(binary_collection, query, search_params=search_param) - assert len(res) == nq - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_invalid_metric_type_binary(self, connect, binary_collection, get_l2_index): - ''' - target: test create index interface with invalid metric type - method: add entitys into binary connection, flash, create index with L2 metric type. - expected: return create_index failure - ''' - # insert 6000 vectors - ids = connect.bulk_insert(binary_collection, default_binary_entities) - connect.flush([binary_collection]) - - if get_l2_index["index_type"] == "BIN_FLAT": - res = connect.create_index(binary_collection, binary_field_name, get_l2_index) - else: - with pytest.raises(Exception) as e: - res = connect.create_index(binary_collection, binary_field_name, get_l2_index) - - """ - ****************************************************************** - The following cases are used to test `get_index_info` function - ****************************************************************** - """ - - def test_get_index_info(self, connect, binary_collection, get_jaccard_index): - ''' - target: test describe index interface - method: create collection and add entities in it, create index, call describe index - expected: return code 0, and index instructure - ''' - ids = connect.bulk_insert(binary_collection, default_binary_entities) - connect.flush([binary_collection]) - connect.create_index(binary_collection, binary_field_name, get_jaccard_index) - stats = connect.get_collection_stats(binary_collection) - assert stats["row_count"] == default_nb - for partition in stats["partitions"]: - segments = partition["segments"] - if segments: - for segment in segments: - for file in segment["files"]: - if "index_type" in file: - assert file["index_type"] == get_jaccard_index["index_type"] - - def test_get_index_info_partition(self, connect, binary_collection, get_jaccard_index): - ''' - target: test describe index interface - method: create collection, create partition and add entities in it, create index, call describe index - expected: return code 0, and index instructure - ''' - connect.create_partition(binary_collection, default_tag) - ids = connect.bulk_insert(binary_collection, default_binary_entities, partition_tag=default_tag) - connect.flush([binary_collection]) - connect.create_index(binary_collection, binary_field_name, get_jaccard_index) - stats = connect.get_collection_stats(binary_collection) - logging.getLogger().info(stats) - assert stats["row_count"] == default_nb - assert len(stats["partitions"]) == 2 - for partition in stats["partitions"]: - segments = partition["segments"] - if segments: - for segment in segments: - for file in segment["files"]: - if "index_type" in file: - assert file["index_type"] == get_jaccard_index["index_type"] - - """ - ****************************************************************** - The following cases are used to test `drop_index` function - ****************************************************************** - """ - - def test_drop_index(self, connect, binary_collection, get_jaccard_index): - ''' - target: test drop index interface - method: create collection and add entities in it, create index, call drop index - expected: return code 0, and default index param - ''' - connect.create_index(binary_collection, binary_field_name, get_jaccard_index) - stats = connect.get_collection_stats(binary_collection) - logging.getLogger().info(stats) - connect.drop_index(binary_collection, binary_field_name) - stats = connect.get_collection_stats(binary_collection) - # assert stats["partitions"][0]["segments"][0]["index_name"] == default_index_type - assert not stats["partitions"][0]["segments"] - - def test_drop_index_partition(self, connect, binary_collection, get_jaccard_index): - ''' - target: test drop index interface - method: create collection, create partition and add entities in it, create index on collection, call drop collection index - expected: return code 0, and default index param - ''' - connect.create_partition(binary_collection, default_tag) - ids = connect.bulk_insert(binary_collection, default_binary_entities, partition_tag=default_tag) - connect.flush([binary_collection]) - connect.create_index(binary_collection, binary_field_name, get_jaccard_index) - stats = connect.get_collection_stats(binary_collection) - connect.drop_index(binary_collection, binary_field_name) - stats = connect.get_collection_stats(binary_collection) - assert stats["row_count"] == default_nb - for partition in stats["partitions"]: - segments = partition["segments"] - if segments: - for segment in segments: - for file in segment["files"]: - if "index_type" not in file: - continue - if file["index_type"] == get_jaccard_index["index_type"]: - assert False - - -@pytest.mark.skip("wait for debugging...") -class TestIndexInvalid(object): - """ - Test create / describe / drop index interfaces with invalid collection names - """ - - @pytest.fixture( - scope="function", - params=gen_invalid_strs() - ) - def get_collection_name(self, request): - yield request.param - - @pytest.mark.level(1) - def test_create_index_with_invalid_collectionname(self, connect, get_collection_name): - collection_name = get_collection_name - with pytest.raises(Exception) as e: - connect.create_index(collection_name, field_name, default_index) - - @pytest.mark.level(1) - def test_drop_index_with_invalid_collectionname(self, connect, get_collection_name): - collection_name = get_collection_name - with pytest.raises(Exception) as e: - connect.drop_index(collection_name) - - @pytest.fixture( - scope="function", - params=gen_invalid_index() - ) - def get_index(self, request): - yield request.param - - @pytest.mark.level(2) - def test_create_index_with_invalid_index_params(self, connect, collection, get_index): - logging.getLogger().info(get_index) - with pytest.raises(Exception) as e: - connect.create_index(collection, field_name, get_simple_index) - - -@pytest.mark.skip("wait for debugging...") -class TestIndexAsync: - @pytest.fixture(scope="function", autouse=True) - def skip_http_check(self, args): - if args["handler"] == "HTTP": - pytest.skip("skip in http mode") - - """ - ****************************************************************** - The following cases are used to test `create_index` function - ****************************************************************** - """ - - @pytest.fixture( - scope="function", - params=gen_simple_index() - ) - def get_simple_index(self, request, connect): - # TODO: Determine the service mode - # if str(connect._cmd("mode")) == "CPU": - if request.param["index_type"] in index_cpu_not_support(): - pytest.skip("sq8h not support in CPU mode") - return request.param - - def check_result(self, res): - logging.getLogger().info("In callback check search result") - logging.getLogger().info(res) - - """ - ****************************************************************** - The following cases are used to test `create_index` function - ****************************************************************** - """ - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection and add entities in it, create index - expected: return search success - ''' - ids = connect.bulk_insert(collection, default_entities) - logging.getLogger().info("start index") - future = connect.create_index(collection, field_name, get_simple_index, _async=True) - logging.getLogger().info("before result") - res = future.result() - # TODO: - logging.getLogger().info(res) - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_drop(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection and add entities in it, create index - expected: return search success - ''' - ids = connect.bulk_insert(collection, default_entities) - logging.getLogger().info("start index") - future = connect.create_index(collection, field_name, get_simple_index, _async=True) - logging.getLogger().info("DROP") - connect.drop_collection(collection) - - @pytest.mark.level(2) - def test_create_index_with_invalid_collectionname(self, connect): - collection_name = " " - future = connect.create_index(collection_name, field_name, default_index, _async=True) - with pytest.raises(Exception) as e: - res = future.result() - - @pytest.mark.timeout(BUILD_TIMEOUT) - def test_create_index_callback(self, connect, collection, get_simple_index): - ''' - target: test create index interface - method: create collection and add entities in it, create index - expected: return search success - ''' - ids = connect.bulk_insert(collection, default_entities) - logging.getLogger().info("start index") - future = connect.create_index(collection, field_name, get_simple_index, _async=True, - _callback=self.check_result) - logging.getLogger().info("before result") - res = future.result() - # TODO: - logging.getLogger().info(res)