diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 5a2d527a7c..cdeebbd7b0 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -2289,12 +2289,12 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { return status; } - std::set flushed_tables; + std::set flushed_collections; status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids, (record.data_size / record.length / sizeof(float)), (const float*)record.data, record.attr_nbytes, record.attr_data_size, - record.attr_data, record.lsn, flushed_tables); - collections_flushed(flushed_tables); + record.attr_data, record.lsn, flushed_collections); + collections_flushed(flushed_collections); milvus::server::CollectInsertMetrics metrics(record.length, status); break; diff --git a/core/src/scheduler/JobMgr.cpp b/core/src/scheduler/JobMgr.cpp index 78e00ad6c8..d30ae6bcf3 100644 --- a/core/src/scheduler/JobMgr.cpp +++ b/core/src/scheduler/JobMgr.cpp @@ -11,8 +11,8 @@ #include "scheduler/JobMgr.h" -#include -#include +#include "src/db/Utils.h" +#include "src/segment/SegmentReader.h" #include #include diff --git a/core/src/server/delivery/request/SearchRequest.cpp b/core/src/server/delivery/request/SearchRequest.cpp index 5836d90b06..48af1c86e7 100644 --- a/core/src/server/delivery/request/SearchRequest.cpp +++ b/core/src/server/delivery/request/SearchRequest.cpp @@ -62,14 +62,14 @@ SearchRequest::OnPreExecute() { // step 1: check collection name auto status = ValidationUtil::ValidateCollectionName(collection_name_); if (!status.ok()) { - LOG_SERVER_ERROR_ << LogOut("[%s][%d] %s", "search", 0, status.message().c_str()); + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "search", 0, status.message().c_str()); return status; } // step 2: check search topk status = ValidationUtil::ValidateSearchTopk(topk_); if (!status.ok()) { - LOG_SERVER_ERROR_ << LogOut("[%s][%d] %s", "search", 0, status.message().c_str()); + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "search", 0, status.message().c_str()); return status; } @@ -77,7 +77,7 @@ SearchRequest::OnPreExecute() { status = ValidationUtil::ValidatePartitionTags(partition_list_); fiu_do_on("SearchRequest.OnExecute.invalid_partition_tags", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { - LOG_SERVER_ERROR_ << LogOut("[%s][%d] %s", "search", 0, status.message().c_str()); + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "search", 0, status.message().c_str()); return status; } @@ -92,7 +92,7 @@ SearchRequest::OnExecute() { fiu_do_on("SearchRequest.OnExecute.throw_std_exception", throw std::exception()); std::string hdr = "SearchRequest execute(collection=" + collection_name_ + ", nq=" + std::to_string(vector_count) + ", k=" + std::to_string(topk_) + ")"; - TimeRecorderAuto rc(LogOut("[%s][%d] %s", "search", 0, hdr.c_str())); + TimeRecorderAuto rc(LogOut("[%s][%ld] %s", "search", 0, hdr.c_str())); // step 4: check collection existence // only process root collection, ignore partition collection @@ -103,17 +103,17 @@ SearchRequest::OnExecute() { status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { - LOG_SERVER_ERROR_ << LogOut("[%s][%d] Collection %s not found: %s", "search", 0, + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Collection %s not found: %s", "search", 0, collection_name_.c_str(), status.message().c_str()); return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); } else { - LOG_SERVER_ERROR_ << LogOut("[%s][%d] Error occurred when describing collection %s: %s", "search", 0, + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Error occurred when describing collection %s: %s", "search", 0, collection_name_.c_str(), status.message().c_str()); return status; } } else { if (!collection_schema_.owner_collection_.empty()) { - LOG_SERVER_ERROR_ << LogOut("[%s][%d] %s", "search", 0, + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "search", 0, CollectionNotExistMsg(collection_name_).c_str()); return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); } @@ -122,14 +122,14 @@ SearchRequest::OnExecute() { // step 5: check search parameters status = ValidationUtil::ValidateSearchParams(extra_params_, collection_schema_, topk_); if (!status.ok()) { - LOG_SERVER_ERROR_ << LogOut("[%s][%d] Invalid search params: %s", "search", 0, status.message().c_str()); + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Invalid search params: %s", "search", 0, status.message().c_str()); return status; } // step 6: check vector data according to metric type status = ValidationUtil::ValidateVectorData(vectors_data_, collection_schema_); if (!status.ok()) { - LOG_SERVER_ERROR_ << LogOut("[%s][%d] Invalid vector data: %s", "search", 0, status.message().c_str()); + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Invalid vector data: %s", "search", 0, status.message().c_str()); return status; } @@ -159,7 +159,7 @@ SearchRequest::OnExecute() { #endif fiu_do_on("SearchRequest.OnExecute.query_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { - LOG_SERVER_ERROR_ << LogOut("[%s][%d] Query fail: %s", "search", 0, status.message().c_str()); + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Query fail: %s", "search", 0, status.message().c_str()); return status; } fiu_do_on("SearchRequest.OnExecute.empty_result_ids", result_ids.clear()); @@ -174,7 +174,7 @@ SearchRequest::OnExecute() { result_.distance_list_.swap(result_distances); rc.RecordSection("construct result"); } catch (std::exception& ex) { - LOG_SERVER_ERROR_ << LogOut("[%s][%d] Encounter exception: %s", "search", 0, ex.what()); + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Encounter exception: %s", "search", 0, ex.what()); return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } diff --git a/core/src/server/web_impl/controller/WebController.hpp b/core/src/server/web_impl/controller/WebController.hpp index d13f320382..ded3a3d97e 100644 --- a/core/src/server/web_impl/controller/WebController.hpp +++ b/core/src/server/web_impl/controller/WebController.hpp @@ -23,7 +23,6 @@ #include "utils/TimeRecorder.h" #include "server/web_impl/Constants.h" -#include "server/web_impl/dto/CmdDto.hpp" #include "server/web_impl/dto/ConfigDto.hpp" #include "server/web_impl/dto/IndexDto.hpp" #include "server/web_impl/dto/PartitionDto.hpp" diff --git a/core/src/server/web_impl/dto/CmdDto.hpp b/core/src/server/web_impl/dto/CmdDto.hpp deleted file mode 100644 index 9923c5ccb0..0000000000 --- a/core/src/server/web_impl/dto/CmdDto.hpp +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include "server/web_impl/dto/Dto.h" - -namespace milvus { -namespace server { -namespace web { - -#include OATPP_CODEGEN_BEGIN(DTO) - -class CommandDto: public oatpp::data::mapping::type::Object { - - DTO_INIT(CommandDto, Object) - - DTO_FIELD(String, reply, "reply"); -}; - -#include OATPP_CODEGEN_END(DTO) - -} // namespace web -} // namespace server -} // namespace milvus \ No newline at end of file diff --git a/core/src/server/web_impl/handler/WebRequestHandler.h b/core/src/server/web_impl/handler/WebRequestHandler.h index 7330427009..aeef4b9e12 100644 --- a/core/src/server/web_impl/handler/WebRequestHandler.h +++ b/core/src/server/web_impl/handler/WebRequestHandler.h @@ -26,7 +26,6 @@ #include "server/context/Context.h" #include "server/delivery/RequestHandler.h" #include "server/web_impl/Types.h" -#include "server/web_impl/dto/CmdDto.hpp" #include "server/web_impl/dto/ConfigDto.hpp" #include "server/web_impl/dto/DevicesDto.hpp" #include "server/web_impl/dto/IndexDto.hpp" diff --git a/core/src/utils/CommonUtil.cpp b/core/src/utils/CommonUtil.cpp index 361a06b029..39c510f562 100644 --- a/core/src/utils/CommonUtil.cpp +++ b/core/src/utils/CommonUtil.cpp @@ -256,7 +256,7 @@ CommonUtil::EraseFromCache(const std::string& item_key) { #ifdef MILVUS_GPU_VERSION server::Config& config = server::Config::GetInstance(); std::vector gpus; - Status s = config.GetGpuResourceConfigSearchResources(gpus); + config.GetGpuResourceConfigSearchResources(gpus); for (auto& gpu : gpus) { cache::GpuCacheMgr::GetInstance(gpu)->EraseItem(item_key); } diff --git a/core/src/utils/LogUtil.cpp b/core/src/utils/LogUtil.cpp index 7c3a35f565..1df0f71a23 100644 --- a/core/src/utils/LogUtil.cpp +++ b/core/src/utils/LogUtil.cpp @@ -54,27 +54,31 @@ RolloutHandler(const char* filename, std::size_t size, el::Level level) { int ret; std::string m(std::string(dir) + "/" + s); s = m; - if (level == el::Level::Global) { - s.append("." + std::to_string(++global_idx)); - ret = rename(m.c_str(), s.c_str()); - } else if (level == el::Level::Debug) { - s.append("." + std::to_string(++debug_idx)); - ret = rename(m.c_str(), s.c_str()); - } else if (level == el::Level::Warning) { - s.append("." + std::to_string(++warning_idx)); - ret = rename(m.c_str(), s.c_str()); - } else if (level == el::Level::Trace) { - s.append("." + std::to_string(++trace_idx)); - ret = rename(m.c_str(), s.c_str()); - } else if (level == el::Level::Error) { - s.append("." + std::to_string(++error_idx)); - ret = rename(m.c_str(), s.c_str()); - } else if (level == el::Level::Fatal) { - s.append("." + std::to_string(++fatal_idx)); - ret = rename(m.c_str(), s.c_str()); - } else { - s.append("." + std::to_string(++global_idx)); - ret = rename(m.c_str(), s.c_str()); + switch (level) { + case el::Level::Debug: + s.append("." + std::to_string(++debug_idx)); + ret = rename(m.c_str(), s.c_str()); + break; + case el::Level::Warning: + s.append("." + std::to_string(++warning_idx)); + ret = rename(m.c_str(), s.c_str()); + break; + case el::Level::Trace: + s.append("." + std::to_string(++trace_idx)); + ret = rename(m.c_str(), s.c_str()); + break; + case el::Level::Error: + s.append("." + std::to_string(++error_idx)); + ret = rename(m.c_str(), s.c_str()); + break; + case el::Level::Fatal: + s.append("." + std::to_string(++fatal_idx)); + ret = rename(m.c_str(), s.c_str()); + break; + default: + s.append("." + std::to_string(++global_idx)); + ret = rename(m.c_str(), s.c_str()); + break; } } diff --git a/core/unittest/server/test_rpc.cpp b/core/unittest/server/test_rpc.cpp index 5c9f4ff417..0c941ebbcc 100644 --- a/core/unittest/server/test_rpc.cpp +++ b/core/unittest/server/test_rpc.cpp @@ -935,19 +935,25 @@ TEST_F(RpcHandlerTest, CMD_TEST) { command.set_cmd("tasktable"); handler->Cmd(&context, &command, &reply); + ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code()); command.set_cmd("test"); handler->Cmd(&context, &command, &reply); + ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code()); command.set_cmd("status"); handler->Cmd(&context, &command, &reply); + ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code()); command.set_cmd("mode"); handler->Cmd(&context, &command, &reply); + ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code()); command.set_cmd("build_commit_id"); handler->Cmd(&context, &command, &reply); + ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code()); command.set_cmd("set_config"); handler->Cmd(&context, &command, &reply); + command.set_cmd("get_config"); handler->Cmd(&context, &command, &reply); } diff --git a/shards/mishards/__init__.py b/shards/mishards/__init__.py index bf7ae33b9a..06473d99df 100644 --- a/shards/mishards/__init__.py +++ b/shards/mishards/__init__.py @@ -15,7 +15,7 @@ def create_app(testing_config=None): pool_recycle=config.SQL_POOL_RECYCLE, pool_timeout=config.SQL_POOL_TIMEOUT, pool_pre_ping=config.SQL_POOL_PRE_PING, max_overflow=config.SQL_MAX_OVERFLOW) - from mishards.connections import ConnectionMgr, ConnectionTopology + from mishards.connections import ConnectionTopology readonly_topo = ConnectionTopology() writable_topo = ConnectionTopology() diff --git a/shards/mishards/connections.py b/shards/mishards/connections.py index fa9c52eddc..d66d0eee22 100644 --- a/shards/mishards/connections.py +++ b/shards/mishards/connections.py @@ -279,96 +279,3 @@ class ConnectionTopology(topology.Topology): if status == topology.StatusType.DUPLICATED: group = None return status, group - - -@singleton -class ConnectionMgr: - def __init__(self): - self.metas = {} - self.conns = {} - - @property - def conn_names(self): - return set(self.metas.keys()) - set(['WOSERVER']) - - def conn(self, name, metadata, throw=False): - c = self.conns.get(name, None) - if not c: - url = self.metas.get(name, None) - if not url: - if not throw: - return None - raise exceptions.ConnectionNotFoundError(message='Connection {} not found'.format(name), - metadata=metadata) - this_conn = Connection(name=name, uri=url, max_retry=settings.MAX_RETRY) - threaded = { - threading.get_ident(): this_conn - } - self.conns[name] = threaded - return this_conn - - tid = threading.get_ident() - rconn = c.get(tid, None) - if not rconn: - url = self.metas.get(name, None) - if not url: - if not throw: - return None - raise exceptions.ConnectionNotFoundError('Connection {} not found'.format(name), - metadata=metadata) - this_conn = Connection(name=name, uri=url, max_retry=settings.MAX_RETRY) - c[tid] = this_conn - return this_conn - - return rconn - - def on_new_meta(self, name, url): - logger.info('Register Connection: name={};url={}'.format(name, url)) - self.metas[name] = url - conn = self.conn(name, metadata=None) - conn.on_connect(metadata=None) - status, _ = conn.conn.server_version() - if not status.OK(): - logger.error('Cannot connect to newly added address: {}. Remove it now'.format(name)) - self.unregister(name) - return False - return True - - def on_duplicate_meta(self, name, url): - if self.metas[name] == url: - return self.on_same_meta(name, url) - - return self.on_diff_meta(name, url) - - def on_same_meta(self, name, url): - # logger.warning('Register same meta: {}:{}'.format(name, url)) - return True - - def on_diff_meta(self, name, url): - logger.warning('Received {} with diff url={}'.format(name, url)) - self.metas[name] = url - self.conns[name] = {} - return True - - def on_unregister_meta(self, name, url): - logger.info('Unregister name={};url={}'.format(name, url)) - self.conns.pop(name, None) - return True - - def on_nonexisted_meta(self, name): - logger.warning('Non-existed meta: {}'.format(name)) - return False - - def register(self, name, url): - meta = self.metas.get(name) - if not meta: - return self.on_new_meta(name, url) - else: - return self.on_duplicate_meta(name, url) - - def unregister(self, name): - logger.info('Unregister Connection: name={}'.format(name)) - url = self.metas.pop(name, None) - if url is None: - return self.on_nonexisted_meta(name) - return self.on_unregister_meta(name, url) diff --git a/shards/mishards/test_connections.py b/shards/mishards/test_connections.py index 5ed948f2a4..d87e38ff49 100644 --- a/shards/mishards/test_connections.py +++ b/shards/mishards/test_connections.py @@ -5,7 +5,7 @@ import random import threading from milvus import Milvus -from mishards.connections import (ConnectionMgr, Connection, +from mishards.connections import (Connection, ConnectionPool, ConnectionTopology, ConnectionGroup) from mishards.topology import StatusType from mishards import exceptions @@ -15,31 +15,6 @@ logger = logging.getLogger(__name__) @pytest.mark.usefixtures('app') class TestConnection: - @pytest.mark.skip - def test_manager(self): - mgr = ConnectionMgr() - - mgr.register('pod1', '111') - mgr.register('pod2', '222') - mgr.register('pod2', '222') - mgr.register('pod2', '2222') - assert len(mgr.conn_names) == 2 - - mgr.unregister('pod1') - assert len(mgr.conn_names) == 1 - - mgr.unregister('pod2') - assert len(mgr.conn_names) == 0 - - mgr.register('WOSERVER', 'xxxx') - assert len(mgr.conn_names) == 0 - - assert not mgr.conn('XXXX', None) - with pytest.raises(exceptions.ConnectionNotFoundError): - mgr.conn('XXXX', None, True) - - mgr.conn('WOSERVER', None) - def test_connection(self): class Conn: def __init__(self, state): diff --git a/shards/mishards/test_server.py b/shards/mishards/test_server.py index f7a1a63cac..cc586397a5 100644 --- a/shards/mishards/test_server.py +++ b/shards/mishards/test_server.py @@ -14,7 +14,7 @@ from mishards.service_handler import ServiceHandler from mishards.grpc_utils.grpc_args_parser import GrpcArgsParser as Parser from mishards.factories import TableFilesFactory, TablesFactory, TableFiles, Tables from mishards.router import RouterMixin -from mishards.connections import (ConnectionMgr, Connection, +from mishards.connections import (Connection, ConnectionPool, ConnectionTopology, ConnectionGroup) logger = logging.getLogger(__name__) diff --git a/tests/milvus_python_test/test_add_vectors.py b/tests/milvus_python_test/test_add_vectors.py index faae9cdb64..c3e809dbba 100644 --- a/tests/milvus_python_test/test_add_vectors.py +++ b/tests/milvus_python_test/test_add_vectors.py @@ -589,20 +589,18 @@ class TestAddBase: expected: status ok and result length is equal to the length off added vectors ''' collection = gen_unique_str() - uri = "tcp://%s:%s" % (args["ip"], args["port"]) param = {'collection_name': collection, 'dimension': dim, 'index_file_size': index_file_size, 'metric_type': MetricType.L2} - milvus = get_milvus(args["handler"]) - milvus.connect(uri=uri) + milvus = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) milvus.create_collection(param) vector = gen_single_vector(dim) process_num = 4 loop_num = 5 processes = [] def add(): - milvus = get_milvus(args["handler"]) + milvus = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) milvus.connect(uri=uri) i = 0 while i < loop_num: @@ -634,19 +632,16 @@ class TestAddBase: thread_num = 8 threads = [] collection = gen_unique_str() - uri = "tcp://%s:%s" % (args["ip"], args["port"]) param = {'collection_name': collection, 'dimension': dim, 'index_file_size': index_file_size, 'metric_type': MetricType.L2} - milvus = get_milvus(args["handler"]) - milvus.connect(uri=uri) + milvus = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) milvus.create_collection(param) vectors = gen_vectors(nb, dim) def add(thread_i): logging.getLogger().info("In thread-%d" % thread_i) - milvus = get_milvus(args["handler"]) - milvus.connect(uri=uri) + milvus = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) status, result = milvus.add_vectors(collection, records=vectors) assert status.OK() status = milvus.flush([collection]) diff --git a/tests/milvus_python_test/test_collection.py b/tests/milvus_python_test/test_collection.py index 0eccbcbc5c..ecf5acdcaf 100644 --- a/tests/milvus_python_test/test_collection.py +++ b/tests/milvus_python_test/test_collection.py @@ -322,7 +322,6 @@ class TestCollection: expected: collection_name equals with the collection name created ''' collection_name = gen_unique_str("test_collection") - uri = "tcp://%s:%s" % (args["ip"], args["port"]) param = {'collection_name': collection_name, 'dimension': dim, 'index_file_size': index_file_size, @@ -336,8 +335,7 @@ class TestCollection: process_num = 4 processes = [] for i in range(process_num): - milvus = get_milvus(args["handler"]) - milvus.connect(uri=uri) + milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) p = Process(target=describecollection, args=(milvus,)) processes.append(p) p.start() @@ -507,8 +505,6 @@ class TestCollection: ''' process_num = 6 processes = [] - uri = "tcp://%s:%s" % (args["ip"], args["port"]) - def deletecollection(milvus): status = milvus.drop_collection(collection) # assert not status.code==0 @@ -516,8 +512,7 @@ class TestCollection: assert status.OK() for i in range(process_num): - milvus = get_milvus(args["handler"]) - milvus.connect(uri=uri) + milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) p = Process(target=deletecollection, args=(milvus,)) processes.append(p) p.start() @@ -786,13 +781,11 @@ class TestCollection: expected: collection_name in show collections ''' collection_name = gen_unique_str("test_collection") - uri = "tcp://%s:%s" % (args["ip"], args["port"]) param = {'collection_name': collection_name, 'dimension': dim, 'index_file_size': index_file_size, 'metric_type': MetricType.L2} connect.create_collection(param) - def showcollections(milvus): status, result = milvus.show_collections() assert status.OK() @@ -802,8 +795,7 @@ class TestCollection: processes = [] for i in range(process_num): - milvus = get_milvus(args["handler"]) - milvus.connect(uri=uri) + milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) p = Process(target=showcollections, args=(milvus,)) processes.append(p) p.start() diff --git a/tests/milvus_python_test/test_collection_count.py b/tests/milvus_python_test/test_collection_count.py index a669c9a40d..667796391b 100644 --- a/tests/milvus_python_test/test_collection_count.py +++ b/tests/milvus_python_test/test_collection_count.py @@ -181,7 +181,6 @@ class TestCollectionCount: expected: the count is equal to the length of vectors ''' nq = 2 - uri = "tcp://%s:%s" % (args["ip"], args["port"]) vectors = gen_vectors(nq, dim) res = connect.add_vectors(collection_name=collection, records=vectors) time.sleep(add_time_interval) @@ -194,8 +193,7 @@ class TestCollectionCount: process_num = 8 processes = [] for i in range(process_num): - milvus = get_milvus(args["handler"]) - milvus.connect(uri=uri) + milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) p = Process(target=rows_count, args=(milvus, )) processes.append(p) p.start() @@ -326,7 +324,6 @@ class TestCollectionCountIP: expected: the count is equal to the length of vectors ''' nq = 2 - uri = "tcp://%s:%s" % (args["ip"], args["port"]) vectors = gen_vectors(nq, dim) res = connect.add_vectors(collection_name=ip_collection, records=vectors) time.sleep(add_time_interval) @@ -339,8 +336,7 @@ class TestCollectionCountIP: process_num = 8 processes = [] for i in range(process_num): - milvus = get_milvus(args["handler"]) - milvus.connect(uri=uri) + milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) p = Process(target=rows_count, args=(milvus,)) processes.append(p) p.start() diff --git a/tests/milvus_python_test/test_flush.py b/tests/milvus_python_test/test_flush.py index 4cbaa20669..2bbd9c079f 100644 --- a/tests/milvus_python_test/test_flush.py +++ b/tests/milvus_python_test/test_flush.py @@ -203,19 +203,16 @@ class TestFlushBase: expected: status ok ''' collection = gen_unique_str() - uri = "tcp://%s:%s" % (args["ip"], args["port"]) param = {'collection_name': collection, 'dimension': dim, 'index_file_size': index_file_size, 'metric_type': MetricType.L2} - milvus = get_milvus(args["handler"]) - milvus.connect(uri=uri) + milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) milvus.create_collection(param) vectors = gen_vector(nb, dim) status, ids = milvus.add_vectors(collection, vectors, ids=[i for i in range(nb)]) def flush(collection_name): - milvus = get_milvus(args["handler"]) - milvus.connect(uri=uri) + milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) status = milvus.delete_by_id(collection_name, [i for i in range(nb)]) assert status.OK() status = milvus.flush([collection_name]) diff --git a/tests/milvus_python_test/test_index.py b/tests/milvus_python_test/test_index.py index 289fca529e..c5460425c3 100644 --- a/tests/milvus_python_test/test_index.py +++ b/tests/milvus_python_test/test_index.py @@ -166,11 +166,8 @@ class TestIndexBase: threads_num = 8 threads = [] - uri = "tcp://%s:%s" % (args["ip"], args["port"]) - for i in range(threads_num): - m = get_milvus(args["handler"]) - m.connect(uri=uri) + m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) t = threading.Thread(target=build, args=(m,)) threads.append(t) t.start() @@ -197,7 +194,6 @@ class TestIndexBase: threads_num = 8 loop_num = 8 threads = [] - collection = [] j = 0 while j < (threads_num*loop_num): @@ -215,7 +211,6 @@ class TestIndexBase: while i < loop_num: # assert connect.has_collection(collection[ids*process_num+i]) status, ids = connect.add_vectors(collection[ids*threads_num+i], vectors) - status = connect.create_index(collection[ids*threads_num+i], IndexType.IVFLAT, {"nlist": NLIST}) assert status.OK() query_vec = [vectors[0]] @@ -226,14 +221,10 @@ class TestIndexBase: assert len(result[0]) == top_k assert result[0][0].distance == 0.0 i = i + 1 - - uri = "tcp://%s:%s" % (args["ip"], args["port"]) - for i in range(threads_num): - m = get_milvus(args["handler"]) - m.connect(uri=uri) + m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) ids = i - t = threading.Thread(target=create_index, args=(m,ids)) + t = threading.Thread(target=create_index, args=(m, ids)) threads.append(t) t.start() time.sleep(0.2) @@ -256,8 +247,7 @@ class TestIndexBase: threads = [] uri = "tcp://%s:%s" % (args["ip"], args["port"]) for i in range(threads_num): - m = get_milvus(args["handler"]) - m.connect(uri=uri) + m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) if(i % 2 == 0): p = threading.Thread(target=build, args=(m,)) else: @@ -286,11 +276,8 @@ class TestIndexBase: process_num = 8 processes = [] - uri = "tcp://%s:%s" % (args["ip"], args["port"]) - for i in range(process_num): - m = get_milvus(args["handler"]) - m.connect(uri=uri) + m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) p = Process(target=build, args=(m,)) processes.append(p) p.start() @@ -347,11 +334,8 @@ class TestIndexBase: assert result[0][0].distance == 0.0 i = i + 1 - uri = "tcp://%s:%s" % (args["ip"], args["port"]) - for i in range(process_num): - m = get_milvus(args["handler"]) - m.connect(uri=uri) + m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) ids = i p = Process(target=create_index, args=(m,ids)) processes.append(p) @@ -792,18 +776,15 @@ class TestIndexIP: expected: return code equals to 0, and search success ''' status, ids = connect.add_vectors(ip_collection, vectors) - def build(connect): status = connect.create_index(ip_collection, IndexType.IVFLAT, {"nlist": NLIST}) assert status.OK() process_num = 8 processes = [] - uri = "tcp://%s:%s" % (args["ip"], args["port"]) for i in range(process_num): - m = get_milvus(args["handler"]) - m.connect(uri=uri) + m = get_milvus(args["ip"], args["port"], handler=args["handler"]) p = Process(target=build, args=(m,)) processes.append(p) p.start() @@ -858,11 +839,8 @@ class TestIndexIP: assert result[0][0].distance == 0.0 i = i + 1 - uri = "tcp://%s:%s" % (args["ip"], args["port"]) - for i in range(process_num): - m = get_milvus(args["handler"]) - m.connect(uri=uri) + m = get_milvus(args["ip"], args["port"], handler=args["handler"]) ids = i p = Process(target=create_index, args=(m,ids)) processes.append(p)