From f02bd8c8f595f64c056d2731c05fa89f31da8372 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Sat, 16 Jan 2021 10:12:14 +0800 Subject: [PATCH] Rename query node package, implement watchDmChannel Signed-off-by: bigsheeper --- cmd/querynode/query_node.go | 8 +- cmd/singlenode/main.go | 8 +- internal/core/src/query/CMakeLists.txt | 3 +- internal/core/src/query/Expr.h | 2 - internal/core/src/query/Plan.cpp | 1 - .../query/{Search.cpp => SearchOnGrowing.cpp} | 40 +++++++++- .../src/query/{Search.h => SearchOnGrowing.h} | 28 +++---- internal/core/src/query/SearchOnSealed.h | 2 +- internal/core/src/query/SubQueryResult.h | 2 +- .../src/query/generated/ExecPlanNodeVisitor.h | 5 ++ .../query/visitors/ExecPlanNodeVisitor.cpp | 45 ++++------- internal/core/src/segcore/ConcurrentVector.h | 1 - internal/core/src/segcore/IndexingEntry.cpp | 3 - internal/core/src/segcore/IndexingEntry.h | 7 +- .../core/src/segcore/SegmentGrowingImpl.h | 18 ++++- internal/core/src/segcore/SegmentInterface.h | 22 ++++++ internal/core/src/segcore/collection_c.cpp | 3 +- internal/core/src/segcore/segment_c.cpp | 9 --- internal/distributed/querynode/service.go | 6 +- internal/querynode/api.go | 78 +++++++++++++------ internal/querynode/collection.go | 2 +- internal/querynode/collection_replica.go | 2 +- internal/querynode/collection_replica_test.go | 2 +- internal/querynode/collection_test.go | 2 +- internal/querynode/data_sync_service.go | 10 ++- internal/querynode/data_sync_service_test.go | 2 +- internal/querynode/flow_graph_dd_node.go | 2 +- internal/querynode/flow_graph_delete_node.go | 2 +- .../querynode/flow_graph_filter_dm_node.go | 2 +- internal/querynode/flow_graph_gc_node.go | 2 +- internal/querynode/flow_graph_insert_node.go | 2 +- internal/querynode/flow_graph_key2seg_node.go | 2 +- internal/querynode/flow_graph_message.go | 2 +- .../flow_graph_msg_stream_input_nodes.go | 8 +- internal/querynode/flow_graph_node.go | 2 +- .../querynode/flow_graph_service_time_node.go | 2 +- internal/querynode/index.go | 2 +- internal/querynode/load_index_info.go | 2 +- internal/querynode/load_index_info_test.go | 2 +- internal/querynode/load_index_service.go | 2 +- internal/querynode/load_index_service_test.go | 2 +- internal/querynode/meta_service.go | 2 +- internal/querynode/meta_service_test.go | 2 +- internal/querynode/param_table.go | 2 +- internal/querynode/param_table_test.go | 2 +- internal/querynode/partition.go | 2 +- internal/querynode/partition_test.go | 2 +- internal/querynode/plan.go | 2 +- internal/querynode/plan_test.go | 2 +- internal/querynode/query_node.go | 2 +- internal/querynode/query_node_test.go | 2 +- internal/querynode/reduce.go | 2 +- internal/querynode/reduce_test.go | 2 +- internal/querynode/search_service.go | 2 +- internal/querynode/search_service_test.go | 2 +- internal/querynode/segment.go | 2 +- internal/querynode/segment_test.go | 2 +- internal/querynode/stats_service.go | 2 +- internal/querynode/stats_service_test.go | 2 +- internal/querynode/tsafe.go | 2 +- internal/querynode/tsafe_test.go | 2 +- internal/querynode/type_def.go | 2 +- 62 files changed, 229 insertions(+), 160 deletions(-) rename internal/core/src/query/{Search.cpp => SearchOnGrowing.cpp} (81%) rename internal/core/src/query/{Search.h => SearchOnGrowing.h} (57%) diff --git a/cmd/querynode/query_node.go b/cmd/querynode/query_node.go index 271dc1c329..4c7a529277 100644 --- a/cmd/querynode/query_node.go +++ b/cmd/querynode/query_node.go @@ -10,16 +10,16 @@ import ( "go.uber.org/zap" - querynodeimp "github.com/zilliztech/milvus-distributed/internal/querynode" + "github.com/zilliztech/milvus-distributed/internal/querynode" ) func main() { - querynodeimp.Init() - fmt.Println("QueryNodeID is", querynodeimp.Params.QueryNodeID) + querynode.Init() + fmt.Println("QueryNodeID is", querynode.Params.QueryNodeID) // Creates server. ctx, cancel := context.WithCancel(context.Background()) - svr := querynodeimp.NewQueryNode(ctx, 0) + svr := querynode.NewQueryNode(ctx, 0) sc := make(chan os.Signal, 1) signal.Notify(sc, diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go index e61e047a21..c6504686dd 100644 --- a/cmd/singlenode/main.go +++ b/cmd/singlenode/main.go @@ -17,7 +17,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/indexnode" "github.com/zilliztech/milvus-distributed/internal/master" "github.com/zilliztech/milvus-distributed/internal/proxynode" - querynodeimp "github.com/zilliztech/milvus-distributed/internal/querynode" + "github.com/zilliztech/milvus-distributed/internal/querynode" "github.com/zilliztech/milvus-distributed/internal/writenode" ) @@ -101,11 +101,11 @@ func InitProxy(wg *sync.WaitGroup) { func InitQueryNode(wg *sync.WaitGroup) { defer wg.Done() - querynodeimp.Init() - fmt.Println("QueryNodeID is", querynodeimp.Params.QueryNodeID) + querynode.Init() + fmt.Println("QueryNodeID is", querynode.Params.QueryNodeID) // Creates server. ctx, cancel := context.WithCancel(context.Background()) - svr := querynodeimp.NewQueryNode(ctx, 0) + svr := querynode.NewQueryNode(ctx, 0) sc := make(chan os.Signal, 1) signal.Notify(sc, diff --git a/internal/core/src/query/CMakeLists.txt b/internal/core/src/query/CMakeLists.txt index d26b2a3a9d..06b15ababe 100644 --- a/internal/core/src/query/CMakeLists.txt +++ b/internal/core/src/query/CMakeLists.txt @@ -1,4 +1,3 @@ -# TODO set(MILVUS_QUERY_SRCS deprecated/BinaryQuery.cpp generated/PlanNode.cpp @@ -10,7 +9,7 @@ set(MILVUS_QUERY_SRCS visitors/VerifyPlanNodeVisitor.cpp visitors/VerifyExprVisitor.cpp Plan.cpp - Search.cpp + SearchOnGrowing.cpp SearchOnSealed.cpp SearchOnIndex.cpp SearchBruteForce.cpp diff --git a/internal/core/src/query/Expr.h b/internal/core/src/query/Expr.h index 255e31fcdd..30816bfa82 100644 --- a/internal/core/src/query/Expr.h +++ b/internal/core/src/query/Expr.h @@ -40,7 +40,6 @@ struct UnaryExpr : Expr { ExprPtr child_; }; -// TODO: not enabled in sprint 1 struct BoolUnaryExpr : UnaryExpr { enum class OpType { LogicalNot }; OpType op_type_; @@ -50,7 +49,6 @@ struct BoolUnaryExpr : UnaryExpr { accept(ExprVisitor&) override; }; -// TODO: not enabled in sprint 1 struct BoolBinaryExpr : BinaryExpr { // Note: bitA - bitB == bitA & ~bitB, alias to LogicalMinus enum class OpType { LogicalAnd, LogicalOr, LogicalXor, LogicalMinus }; diff --git a/internal/core/src/query/Plan.cpp b/internal/core/src/query/Plan.cpp index 5dc83a4b3a..8fc1cc804d 100644 --- a/internal/core/src/query/Plan.cpp +++ b/internal/core/src/query/Plan.cpp @@ -187,7 +187,6 @@ Parser::ParseTermNode(const Json& out_body) { std::unique_ptr Parser::ParseVecNode(const Json& out_body) { Assert(out_body.is_object()); - // TODO add binary info Assert(out_body.size() == 1); auto iter = out_body.begin(); auto field_name = FieldName(iter.key()); diff --git a/internal/core/src/query/Search.cpp b/internal/core/src/query/SearchOnGrowing.cpp similarity index 81% rename from internal/core/src/query/Search.cpp rename to internal/core/src/query/SearchOnGrowing.cpp index 3ba611637b..46aecb3431 100644 --- a/internal/core/src/query/Search.cpp +++ b/internal/core/src/query/SearchOnGrowing.cpp @@ -9,7 +9,7 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License -#include "Search.h" +#include "SearchOnGrowing.h" #include #include #include "segcore/Reduce.h" @@ -65,7 +65,6 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment, auto topK = info.topK_; auto total_count = topK * num_queries; auto metric_type = GetMetricType(info.metric_type_); - // TODO: optimize // step 3: small indexing search // std::vector final_uids(total_count, -1); @@ -77,10 +76,9 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment, const auto& indexing_entry = indexing_record.get_vec_entry(vecfield_offset); auto search_conf = indexing_entry.get_search_conf(topK); - // TODO: use sub_qr for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) { auto chunk_size = indexing_entry.get_chunk_size(); - auto indexing = indexing_entry.get_vec_indexing(chunk_id); + auto indexing = indexing_entry.get_indexing(chunk_id); auto sub_view = BitsetSubView(bitset, chunk_id * chunk_size, chunk_size); auto sub_qr = SearchOnIndex(query_dataset, *indexing, search_conf, sub_view); @@ -197,4 +195,38 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment, return Status::OK(); } +// TODO: refactor and merge this into one +template +void +SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, + const query::QueryInfo& info, + const EmbeddedType* query_data, + int64_t num_queries, + Timestamp timestamp, + const faiss::BitsetView& bitset, + QueryResult& results) { + static_assert(IsVector); + if constexpr (std::is_same_v) { + FloatSearch(segment, info, query_data, num_queries, timestamp, bitset, results); + } else { + BinarySearch(segment, info, query_data, num_queries, timestamp, bitset, results); + } +} +template void +SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, + const query::QueryInfo& info, + const EmbeddedType* query_data, + int64_t num_queries, + Timestamp timestamp, + const faiss::BitsetView& bitset, + QueryResult& results); +template void +SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, + const query::QueryInfo& info, + const EmbeddedType* query_data, + int64_t num_queries, + Timestamp timestamp, + const faiss::BitsetView& bitset, + QueryResult& results); + } // namespace milvus::query diff --git a/internal/core/src/query/Search.h b/internal/core/src/query/SearchOnGrowing.h similarity index 57% rename from internal/core/src/query/Search.h rename to internal/core/src/query/SearchOnGrowing.h index 3aa8d1ec9b..003bdcf35b 100644 --- a/internal/core/src/query/Search.h +++ b/internal/core/src/query/SearchOnGrowing.h @@ -20,23 +20,13 @@ namespace milvus::query { using BitmapChunk = boost::dynamic_bitset<>; using BitmapSimple = std::deque; -// TODO: merge these two search into one -// note: c++17 don't support optional ref -Status -FloatSearch(const segcore::SegmentGrowingImpl& segment, - const QueryInfo& info, - const float* query_data, - int64_t num_queries, - Timestamp timestamp, - const faiss::BitsetView& bitset, - QueryResult& results); - -Status -BinarySearch(const segcore::SegmentGrowingImpl& segment, - const query::QueryInfo& info, - const uint8_t* query_data, - int64_t num_queries, - Timestamp timestamp, - const faiss::BitsetView& bitset, - QueryResult& results); +template +void +SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, + const query::QueryInfo& info, + const EmbeddedType* query_data, + int64_t num_queries, + Timestamp timestamp, + const faiss::BitsetView& bitset, + QueryResult& results); } // namespace milvus::query diff --git a/internal/core/src/query/SearchOnSealed.h b/internal/core/src/query/SearchOnSealed.h index 01f3864e0d..227f1a15ce 100644 --- a/internal/core/src/query/SearchOnSealed.h +++ b/internal/core/src/query/SearchOnSealed.h @@ -13,7 +13,7 @@ #include "segcore/SealedIndexingRecord.h" #include "query/PlanNode.h" -#include "query/Search.h" +#include "query/SearchOnGrowing.h" namespace milvus::query { diff --git a/internal/core/src/query/SubQueryResult.h b/internal/core/src/query/SubQueryResult.h index 6cf7aace58..7bb498039b 100644 --- a/internal/core/src/query/SubQueryResult.h +++ b/internal/core/src/query/SubQueryResult.h @@ -33,7 +33,7 @@ class SubQueryResult { static constexpr bool is_descending(MetricType metric_type) { - // TODO + // TODO(dog): more types if (metric_type == MetricType::METRIC_INNER_PRODUCT) { return true; } else { diff --git a/internal/core/src/query/generated/ExecPlanNodeVisitor.h b/internal/core/src/query/generated/ExecPlanNodeVisitor.h index 2cd629edf5..08ac2f694e 100644 --- a/internal/core/src/query/generated/ExecPlanNodeVisitor.h +++ b/internal/core/src/query/generated/ExecPlanNodeVisitor.h @@ -46,6 +46,11 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor { return ret; } + private: + template + void + VectorVisitorImpl(VectorPlanNode& node); + private: // std::optional ret_; const segcore::SegmentGrowing& segment_; diff --git a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp index fbe99079c4..e43837a060 100644 --- a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp @@ -16,7 +16,7 @@ #include "query/generated/ExecPlanNodeVisitor.h" #include "segcore/SegmentGrowingImpl.h" #include "query/generated/ExecExprVisitor.h" -#include "query/Search.h" +#include "query/SearchOnGrowing.h" #include "query/SearchOnSealed.h" namespace milvus::query { @@ -45,6 +45,11 @@ class ExecPlanNodeVisitor : PlanNodeVisitor { return ret; } + private: + template + void + VectorVisitorImpl(VectorPlanNode& node); + private: // std::optional ret_; const segcore::SegmentGrowing& segment_; @@ -56,15 +61,16 @@ class ExecPlanNodeVisitor : PlanNodeVisitor { } // namespace impl #endif +template void -ExecPlanNodeVisitor::visit(FloatVectorANNS& node) { +ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) { // TODO: optimize here, remove the dynamic cast assert(!ret_.has_value()); auto segment = dynamic_cast(&segment_); AssertInfo(segment, "support SegmentSmallIndex Only"); RetType ret; auto& ph = placeholder_group_.at(0); - auto src_data = ph.get_blob(); + auto src_data = ph.get_blob>(); auto num_queries = ph.num_of_queries_; aligned_vector bitset_holder; @@ -80,39 +86,20 @@ ExecPlanNodeVisitor::visit(FloatVectorANNS& node) { SearchOnSealed(segment->get_schema(), sealed_indexing, node.query_info_, src_data, num_queries, timestamp_, view, ret); } else { - FloatSearch(*segment, node.query_info_, src_data, num_queries, timestamp_, view, ret); + SearchOnGrowing(*segment, node.query_info_, src_data, num_queries, timestamp_, view, ret); } ret_ = ret; } +void +ExecPlanNodeVisitor::visit(FloatVectorANNS& node) { + VectorVisitorImpl(node); +} + void ExecPlanNodeVisitor::visit(BinaryVectorANNS& node) { - // TODO: optimize here, remove the dynamic cast - assert(!ret_.has_value()); - auto segment = dynamic_cast(&segment_); - AssertInfo(segment, "support SegmentSmallIndex Only"); - RetType ret; - auto& ph = placeholder_group_.at(0); - auto src_data = ph.get_blob(); - auto num_queries = ph.num_of_queries_; - - aligned_vector bitset_holder; - BitsetView view; - if (node.predicate_.has_value()) { - ExecExprVisitor::RetType expr_ret = ExecExprVisitor(*segment).call_child(*node.predicate_.value()); - bitset_holder = AssembleNegBitmap(expr_ret); - view = BitsetView(bitset_holder.data(), bitset_holder.size() * 8); - } - - auto& sealed_indexing = segment->get_sealed_indexing_record(); - if (sealed_indexing.is_ready(node.query_info_.field_offset_)) { - SearchOnSealed(segment->get_schema(), sealed_indexing, node.query_info_, src_data, num_queries, timestamp_, - view, ret); - } else { - BinarySearch(*segment, node.query_info_, src_data, num_queries, timestamp_, view, ret); - } - ret_ = ret; + VectorVisitorImpl(node); } } // namespace milvus::query diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index f883d0bc39..87cfd76ac8 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -39,7 +39,6 @@ class ThreadSafeVector { if (size <= size_) { return; } - // TODO: use multithread to speedup std::lock_guard lck(mutex_); while (vec_.size() < size) { vec_.emplace_back(std::forward(args...)); diff --git a/internal/core/src/segcore/IndexingEntry.cpp b/internal/core/src/segcore/IndexingEntry.cpp index 16545544cd..1cd6405955 100644 --- a/internal/core/src/segcore/IndexingEntry.cpp +++ b/internal/core/src/segcore/IndexingEntry.cpp @@ -17,8 +17,6 @@ namespace milvus::segcore { void VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { - // TODO - assert(field_meta_.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field_meta_.get_dim(); @@ -31,7 +29,6 @@ VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const Vector for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) { const auto& chunk = source->get_chunk(chunk_id); // build index for chunk - // TODO auto indexing = std::make_unique(); auto dataset = knowhere::GenDataset(source->get_chunk_size(), dim, chunk.data()); indexing->Train(dataset, conf); diff --git a/internal/core/src/segcore/IndexingEntry.h b/internal/core/src/segcore/IndexingEntry.h index a4cde732a5..f790f5e1a1 100644 --- a/internal/core/src/segcore/IndexingEntry.h +++ b/internal/core/src/segcore/IndexingEntry.h @@ -47,6 +47,9 @@ class IndexingEntry { return chunk_size_; } + virtual knowhere::Index* + get_indexing(int64_t chunk_id) const = 0; + protected: // additional info const FieldMeta& field_meta_; @@ -62,7 +65,7 @@ class ScalarIndexingEntry : public IndexingEntry { // concurrent knowhere::scalar::StructuredIndex* - get_indexing(int64_t chunk_id) const { + get_indexing(int64_t chunk_id) const override { Assert(!field_meta_.is_vector()); return data_.at(chunk_id).get(); } @@ -80,7 +83,7 @@ class VecIndexingEntry : public IndexingEntry { // concurrent knowhere::VecIndex* - get_vec_indexing(int64_t chunk_id) const { + get_indexing(int64_t chunk_id) const override { Assert(field_meta_.is_vector()); return data_.at(chunk_id).get(); } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index d3f05972ba..15d129c355 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -39,8 +39,6 @@ class SegmentGrowingImpl : public SegmentGrowing { int64_t PreInsert(int64_t size) override; - // TODO: originally, id should be put into data_chunk - // TODO: Is it ok to put them the other side? Status Insert(int64_t reserved_offset, int64_t size, @@ -95,6 +93,22 @@ class SegmentGrowingImpl : public SegmentGrowing { return *schema_; } + // return count of index that has index, i.e., [0, num_chunk_index) have built index + int64_t + num_chunk_index_safe(FieldOffset field_offset) const final { + return indexing_record_.get_finished_ack(); + } + + const knowhere::Index* + chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const final { + return indexing_record_.get_entry(field_offset).get_indexing(chunk_id); + } + + int64_t + chunk_size() const final { + return chunk_size_; + } + public: ssize_t get_row_count() const override { diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 4ad2177e34..34c18b53ee 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -14,6 +14,8 @@ #include "common/Schema.h" #include "query/Plan.h" #include "common/Span.h" +#include "IndexingEntry.h" +#include namespace milvus::segcore { @@ -52,10 +54,30 @@ class SegmentInternalInterface : public SegmentInterface { return static_cast>(chunk_data_impl(field_offset, chunk_id)); } + virtual int64_t + num_chunk_index_safe(FieldOffset field_offset) const = 0; + + template + const knowhere::scalar::StructuredIndex& + chunk_scalar_index(FieldOffset field_offset, int64_t chunk_id) const { + static_assert(IsScalar); + using IndexType = knowhere::scalar::StructuredIndex; + auto base_ptr = chunk_index_impl(field_offset, chunk_id); + auto ptr = dynamic_cast(base_ptr); + AssertInfo(ptr, "entry mismatch"); + return *ptr; + } + + virtual int64_t + chunk_size() const = 0; + protected: // blob and row_count virtual SpanBase chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const = 0; + + virtual const knowhere::Index* + chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const = 0; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/collection_c.cpp b/internal/core/src/segcore/collection_c.cpp index feb5fc7bf1..34476504cf 100644 --- a/internal/core/src/segcore/collection_c.cpp +++ b/internal/core/src/segcore/collection_c.cpp @@ -19,7 +19,6 @@ NewCollection(const char* schema_proto_blob) { auto collection = std::make_unique(proto); - // TODO: delete print std::cout << "create collection " << collection->get_collection_name() << std::endl; return (void*)collection.release(); @@ -29,8 +28,8 @@ void DeleteCollection(CCollection collection) { auto col = (milvus::segcore::Collection*)collection; - // TODO: delete print std::cout << "delete collection " << col->get_collection_name() << std::endl; + delete col; } diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index d7e6d36638..523af18427 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -27,7 +27,6 @@ NewSegment(CCollection collection, uint64_t segment_id) { auto segment = milvus::segcore::CreateGrowingSegment(col->get_schema()); - // TODO: delete print std::cout << "create segment " << segment_id << std::endl; return (void*)segment.release(); } @@ -36,7 +35,6 @@ void DeleteSegment(CSegmentBase segment) { auto s = (milvus::segcore::SegmentGrowing*)segment; - // TODO: delete print std::cout << "delete segment " << std::endl; delete s; } @@ -78,17 +76,12 @@ Insert(CSegmentBase c_segment, status.error_msg = strdup(e.what()); return status; } - - // TODO: delete print - // std::cout << "do segment insert, sizeof_per_row = " << sizeof_per_row << std::endl; } int64_t PreInsert(CSegmentBase c_segment, int64_t size) { auto segment = (milvus::segcore::SegmentGrowing*)c_segment; - // TODO: delete print - // std::cout << "PreInsert segment " << std::endl; return segment->PreInsert(size); } @@ -116,8 +109,6 @@ int64_t PreDelete(CSegmentBase c_segment, int64_t size) { auto segment = (milvus::segcore::SegmentGrowing*)c_segment; - // TODO: delete print - // std::cout << "PreDelete segment " << std::endl; return segment->PreDelete(size); } diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 93026851c1..f9966b97b6 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -8,17 +8,17 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" - querynodeimp "github.com/zilliztech/milvus-distributed/internal/querynode" + "github.com/zilliztech/milvus-distributed/internal/querynode" ) type Server struct { grpcServer *grpc.Server - node querynodeimp.Node + node querynode.Node } func NewServer(ctx context.Context, queryNodeID uint64) *Server { return &Server{ - node: querynodeimp.NewQueryNode(ctx, queryNodeID), + node: querynode.NewQueryNode(ctx, queryNodeID), } } diff --git a/internal/querynode/api.go b/internal/querynode/api.go index 905d3421c3..6d5afb2329 100644 --- a/internal/querynode/api.go +++ b/internal/querynode/api.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "context" @@ -20,6 +20,16 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery return status, errors.New(errMsg) default: + if node.searchService == nil || node.searchService.searchMsgStream == nil { + errMsg := "null search service or null search message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream) if !ok { errMsg := "type assertion failed for search message stream" @@ -71,6 +81,16 @@ func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.Remov return status, errors.New(errMsg) default: + if node.searchService == nil || node.searchService.searchMsgStream == nil { + errMsg := "null search service or null search result message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream) if !ok { errMsg := "type assertion failed for search message stream" @@ -124,30 +144,38 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC return status, errors.New(errMsg) default: - // TODO: add dmMsgStream reference to dataSyncService - //fgDMMsgStream, ok := node.dataSyncService.dmMsgStream.(*msgstream.PulsarMsgStream) - //if !ok { - // errMsg := "type assertion failed for dm message stream" - // status := &commonpb.Status{ - // ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - // Reason: errMsg, - // } - // - // return status, errors.New(errMsg) - //} - // - //// add request channel - //pulsarBufSize := Params.SearchPulsarBufSize - //consumeChannels := in.ChannelIDs - //consumeSubName := Params.MsgChannelSubName - //unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() - //fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) - // - //status := &commonpb.Status{ - // ErrorCode: commonpb.ErrorCode_SUCCESS, - //} - //return status, nil - return nil, nil + if node.dataSyncService == nil || node.dataSyncService.dmStream == nil { + errMsg := "null data sync service or null data manipulation stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + fgDMMsgStream, ok := node.dataSyncService.dmStream.(*msgstream.PulsarMsgStream) + if !ok { + errMsg := "type assertion failed for dm message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + // add request channel + pulsarBufSize := Params.SearchPulsarBufSize + consumeChannels := in.ChannelIDs + consumeSubName := Params.MsgChannelSubName + unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() + fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } + return status, nil } } diff --git a/internal/querynode/collection.go b/internal/querynode/collection.go index 646fe33926..638afffa4d 100644 --- a/internal/querynode/collection.go +++ b/internal/querynode/collection.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode /* diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 115daf00ff..c035069146 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode /* diff --git a/internal/querynode/collection_replica_test.go b/internal/querynode/collection_replica_test.go index fe328fea5b..57af33969b 100644 --- a/internal/querynode/collection_replica_test.go +++ b/internal/querynode/collection_replica_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "testing" diff --git a/internal/querynode/collection_test.go b/internal/querynode/collection_test.go index 2c4c5e9601..ac757da790 100644 --- a/internal/querynode/collection_test.go +++ b/internal/querynode/collection_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "testing" diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 3cdf6fa562..ebc4cb1541 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -1,9 +1,10 @@ -package querynodeimp +package querynode import ( "context" "log" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) @@ -11,6 +12,9 @@ type dataSyncService struct { ctx context.Context fg *flowgraph.TimeTickedFlowGraph + dmStream msgstream.MsgStream + ddStream msgstream.MsgStream + replica collectionReplica } @@ -40,8 +44,8 @@ func (dsService *dataSyncService) initNodes() { dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) - var dmStreamNode node = newDmInputNode(dsService.ctx) - var ddStreamNode node = newDDInputNode(dsService.ctx) + var dmStreamNode node = dsService.newDmInputNode(dsService.ctx) + var ddStreamNode node = dsService.newDDInputNode(dsService.ctx) var filterDmNode node = newFilteredDmNode() var ddNode node = newDDNode(dsService.replica) diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index c5f429abe3..79e3015b14 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "encoding/binary" diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index 880137dc6d..c264ac9828 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "log" diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index d873c487fb..77b26d2f14 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode type deleteNode struct { baseNode diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 26d4658e82..db9f60a62b 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "context" diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go index aae7ddea99..076f44fcd9 100644 --- a/internal/querynode/flow_graph_gc_node.go +++ b/internal/querynode/flow_graph_gc_node.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "log" diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 57f6500b1c..2df6baa985 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "context" diff --git a/internal/querynode/flow_graph_key2seg_node.go b/internal/querynode/flow_graph_key2seg_node.go index ff3b966a5b..c4f345a5ec 100644 --- a/internal/querynode/flow_graph_key2seg_node.go +++ b/internal/querynode/flow_graph_key2seg_node.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode type key2SegNode struct { baseNode diff --git a/internal/querynode/flow_graph_message.go b/internal/querynode/flow_graph_message.go index 58d4eb5283..451f9b6952 100644 --- a/internal/querynode/flow_graph_message.go +++ b/internal/querynode/flow_graph_message.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "github.com/zilliztech/milvus-distributed/internal/msgstream" diff --git a/internal/querynode/flow_graph_msg_stream_input_nodes.go b/internal/querynode/flow_graph_msg_stream_input_nodes.go index d71902bd57..59db032b99 100644 --- a/internal/querynode/flow_graph_msg_stream_input_nodes.go +++ b/internal/querynode/flow_graph_msg_stream_input_nodes.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "context" @@ -7,7 +7,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) -func newDmInputNode(ctx context.Context) *flowgraph.InputNode { +func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph.InputNode { receiveBufSize := Params.InsertReceiveBufSize pulsarBufSize := Params.InsertPulsarBufSize @@ -22,6 +22,7 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode { insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) var stream msgstream.MsgStream = insertStream + dsService.dmStream = stream maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -30,7 +31,7 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode { return node } -func newDDInputNode(ctx context.Context) *flowgraph.InputNode { +func (dsService *dataSyncService) newDDInputNode(ctx context.Context) *flowgraph.InputNode { receiveBufSize := Params.DDReceiveBufSize pulsarBufSize := Params.DDPulsarBufSize @@ -45,6 +46,7 @@ func newDDInputNode(ctx context.Context) *flowgraph.InputNode { ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) var stream msgstream.MsgStream = ddStream + dsService.ddStream = stream maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism diff --git a/internal/querynode/flow_graph_node.go b/internal/querynode/flow_graph_node.go index d6e9f8b048..e8a10b3cb7 100644 --- a/internal/querynode/flow_graph_node.go +++ b/internal/querynode/flow_graph_node.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 8262909d98..a7830c4680 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "log" diff --git a/internal/querynode/index.go b/internal/querynode/index.go index fdea5e8366..8c8f84b17e 100644 --- a/internal/querynode/index.go +++ b/internal/querynode/index.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode /* diff --git a/internal/querynode/load_index_info.go b/internal/querynode/load_index_info.go index a654d9628c..d56cca4f21 100644 --- a/internal/querynode/load_index_info.go +++ b/internal/querynode/load_index_info.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode /* #cgo CFLAGS: -I${SRCDIR}/../core/output/include diff --git a/internal/querynode/load_index_info_test.go b/internal/querynode/load_index_info_test.go index 64bf1966a5..95261c7002 100644 --- a/internal/querynode/load_index_info_test.go +++ b/internal/querynode/load_index_info_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "testing" diff --git a/internal/querynode/load_index_service.go b/internal/querynode/load_index_service.go index e6c4453317..10857b4f3a 100644 --- a/internal/querynode/load_index_service.go +++ b/internal/querynode/load_index_service.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "context" diff --git a/internal/querynode/load_index_service_test.go b/internal/querynode/load_index_service_test.go index be6c6b0bbb..2b0471dc00 100644 --- a/internal/querynode/load_index_service_test.go +++ b/internal/querynode/load_index_service_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "encoding/binary" diff --git a/internal/querynode/meta_service.go b/internal/querynode/meta_service.go index ea0880e486..f24dc1433a 100644 --- a/internal/querynode/meta_service.go +++ b/internal/querynode/meta_service.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "context" diff --git a/internal/querynode/meta_service_test.go b/internal/querynode/meta_service_test.go index 05aefca6b0..eebaf58636 100644 --- a/internal/querynode/meta_service_test.go +++ b/internal/querynode/meta_service_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "math" diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 24caf25d1e..663134dc15 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "log" diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index bceb0ff7d7..461073146f 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "fmt" diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go index 538e5732d8..e2dc4593d7 100644 --- a/internal/querynode/partition.go +++ b/internal/querynode/partition.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode /* diff --git a/internal/querynode/partition_test.go b/internal/querynode/partition_test.go index aeccc6cccd..7f22d51487 100644 --- a/internal/querynode/partition_test.go +++ b/internal/querynode/partition_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "testing" diff --git a/internal/querynode/plan.go b/internal/querynode/plan.go index b0d695576d..0909406189 100644 --- a/internal/querynode/plan.go +++ b/internal/querynode/plan.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode /* #cgo CFLAGS: -I${SRCDIR}/../core/output/include diff --git a/internal/querynode/plan_test.go b/internal/querynode/plan_test.go index 88913550d3..e707f404d1 100644 --- a/internal/querynode/plan_test.go +++ b/internal/querynode/plan_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "encoding/binary" diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index fa49db9597..f7d941097a 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode /* diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index c7d57803f6..f69cb67360 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "context" diff --git a/internal/querynode/reduce.go b/internal/querynode/reduce.go index d6a9e858ea..8fa56bf7fb 100644 --- a/internal/querynode/reduce.go +++ b/internal/querynode/reduce.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode /* #cgo CFLAGS: -I${SRCDIR}/../core/output/include diff --git a/internal/querynode/reduce_test.go b/internal/querynode/reduce_test.go index 0dadbdc702..8667c68f4a 100644 --- a/internal/querynode/reduce_test.go +++ b/internal/querynode/reduce_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "encoding/binary" diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 689aa71350..5a6ce44a55 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import "C" import ( diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go index 789b934838..773898af9a 100644 --- a/internal/querynode/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "context" diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index bd9380b0ad..2e00200a2f 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode /* diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index 3de321fc77..52689704b7 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "encoding/binary" diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go index c5e27f9b88..17c8bd9473 100644 --- a/internal/querynode/stats_service.go +++ b/internal/querynode/stats_service.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "context" diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go index 574d258bcc..648495b56f 100644 --- a/internal/querynode/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "testing" diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go index eab7dd1966..60529a3c98 100644 --- a/internal/querynode/tsafe.go +++ b/internal/querynode/tsafe.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "sync" diff --git a/internal/querynode/tsafe_test.go b/internal/querynode/tsafe_test.go index 88021d1036..1ae166f7f7 100644 --- a/internal/querynode/tsafe_test.go +++ b/internal/querynode/tsafe_test.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import ( "testing" diff --git a/internal/querynode/type_def.go b/internal/querynode/type_def.go index 7b6fb06efb..6cbd347791 100644 --- a/internal/querynode/type_def.go +++ b/internal/querynode/type_def.go @@ -1,4 +1,4 @@ -package querynodeimp +package querynode import "github.com/zilliztech/milvus-distributed/internal/util/typeutil"