mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add consume function in rocksmq
Signed-off-by: yukun <kun.yu@zilliz.com>
This commit is contained in:
parent
c0a3a509f7
commit
44c75bdd2a
@ -1,3 +1,4 @@
|
||||
# TODO
|
||||
set(MILVUS_QUERY_SRCS
|
||||
deprecated/BinaryQuery.cpp
|
||||
generated/PlanNode.cpp
|
||||
@ -9,7 +10,7 @@ set(MILVUS_QUERY_SRCS
|
||||
visitors/VerifyPlanNodeVisitor.cpp
|
||||
visitors/VerifyExprVisitor.cpp
|
||||
Plan.cpp
|
||||
SearchOnGrowing.cpp
|
||||
Search.cpp
|
||||
SearchOnSealed.cpp
|
||||
SearchOnIndex.cpp
|
||||
SearchBruteForce.cpp
|
||||
|
||||
@ -40,6 +40,7 @@ struct UnaryExpr : Expr {
|
||||
ExprPtr child_;
|
||||
};
|
||||
|
||||
// TODO: not enabled in sprint 1
|
||||
struct BoolUnaryExpr : UnaryExpr {
|
||||
enum class OpType { LogicalNot };
|
||||
OpType op_type_;
|
||||
@ -49,6 +50,7 @@ struct BoolUnaryExpr : UnaryExpr {
|
||||
accept(ExprVisitor&) override;
|
||||
};
|
||||
|
||||
// TODO: not enabled in sprint 1
|
||||
struct BoolBinaryExpr : BinaryExpr {
|
||||
// Note: bitA - bitB == bitA & ~bitB, alias to LogicalMinus
|
||||
enum class OpType { LogicalAnd, LogicalOr, LogicalXor, LogicalMinus };
|
||||
|
||||
@ -187,6 +187,7 @@ Parser::ParseTermNode(const Json& out_body) {
|
||||
std::unique_ptr<VectorPlanNode>
|
||||
Parser::ParseVecNode(const Json& out_body) {
|
||||
Assert(out_body.is_object());
|
||||
// TODO add binary info
|
||||
Assert(out_body.size() == 1);
|
||||
auto iter = out_body.begin();
|
||||
auto field_name = FieldName(iter.key());
|
||||
|
||||
@ -9,7 +9,7 @@
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include "SearchOnGrowing.h"
|
||||
#include "Search.h"
|
||||
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
|
||||
#include <knowhere/index/vector_index/VecIndexFactory.h>
|
||||
#include "segcore/Reduce.h"
|
||||
@ -65,6 +65,7 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment,
|
||||
auto topK = info.topK_;
|
||||
auto total_count = topK * num_queries;
|
||||
auto metric_type = GetMetricType(info.metric_type_);
|
||||
// TODO: optimize
|
||||
|
||||
// step 3: small indexing search
|
||||
// std::vector<int64_t> final_uids(total_count, -1);
|
||||
@ -76,9 +77,10 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment,
|
||||
const auto& indexing_entry = indexing_record.get_vec_entry(vecfield_offset);
|
||||
auto search_conf = indexing_entry.get_search_conf(topK);
|
||||
|
||||
// TODO: use sub_qr
|
||||
for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) {
|
||||
auto chunk_size = indexing_entry.get_chunk_size();
|
||||
auto indexing = indexing_entry.get_indexing(chunk_id);
|
||||
auto indexing = indexing_entry.get_vec_indexing(chunk_id);
|
||||
|
||||
auto sub_view = BitsetSubView(bitset, chunk_id * chunk_size, chunk_size);
|
||||
auto sub_qr = SearchOnIndex(query_dataset, *indexing, search_conf, sub_view);
|
||||
@ -195,38 +197,4 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// TODO: refactor and merge this into one
|
||||
template <typename VectorType>
|
||||
void
|
||||
SearchOnGrowing(const segcore::SegmentGrowingImpl& segment,
|
||||
const query::QueryInfo& info,
|
||||
const EmbeddedType<VectorType>* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results) {
|
||||
static_assert(IsVector<VectorType>);
|
||||
if constexpr (std::is_same_v<VectorType, FloatVector>) {
|
||||
FloatSearch(segment, info, query_data, num_queries, timestamp, bitset, results);
|
||||
} else {
|
||||
BinarySearch(segment, info, query_data, num_queries, timestamp, bitset, results);
|
||||
}
|
||||
}
|
||||
template void
|
||||
SearchOnGrowing<FloatVector>(const segcore::SegmentGrowingImpl& segment,
|
||||
const query::QueryInfo& info,
|
||||
const EmbeddedType<FloatVector>* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results);
|
||||
template void
|
||||
SearchOnGrowing<BinaryVector>(const segcore::SegmentGrowingImpl& segment,
|
||||
const query::QueryInfo& info,
|
||||
const EmbeddedType<BinaryVector>* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results);
|
||||
|
||||
} // namespace milvus::query
|
||||
@ -20,13 +20,23 @@ namespace milvus::query {
|
||||
using BitmapChunk = boost::dynamic_bitset<>;
|
||||
using BitmapSimple = std::deque<BitmapChunk>;
|
||||
|
||||
template <typename VectorType>
|
||||
void
|
||||
SearchOnGrowing(const segcore::SegmentGrowingImpl& segment,
|
||||
const query::QueryInfo& info,
|
||||
const EmbeddedType<VectorType>* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results);
|
||||
// TODO: merge these two search into one
|
||||
// note: c++17 don't support optional ref
|
||||
Status
|
||||
FloatSearch(const segcore::SegmentGrowingImpl& segment,
|
||||
const QueryInfo& info,
|
||||
const float* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results);
|
||||
|
||||
Status
|
||||
BinarySearch(const segcore::SegmentGrowingImpl& segment,
|
||||
const query::QueryInfo& info,
|
||||
const uint8_t* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results);
|
||||
} // namespace milvus::query
|
||||
@ -13,7 +13,7 @@
|
||||
|
||||
#include "segcore/SealedIndexingRecord.h"
|
||||
#include "query/PlanNode.h"
|
||||
#include "query/SearchOnGrowing.h"
|
||||
#include "query/Search.h"
|
||||
|
||||
namespace milvus::query {
|
||||
|
||||
|
||||
@ -33,7 +33,7 @@ class SubQueryResult {
|
||||
|
||||
static constexpr bool
|
||||
is_descending(MetricType metric_type) {
|
||||
// TODO(dog): more types
|
||||
// TODO
|
||||
if (metric_type == MetricType::METRIC_INNER_PRODUCT) {
|
||||
return true;
|
||||
} else {
|
||||
|
||||
@ -46,11 +46,6 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
|
||||
return ret;
|
||||
}
|
||||
|
||||
private:
|
||||
template <typename VectorType>
|
||||
void
|
||||
VectorVisitorImpl(VectorPlanNode& node);
|
||||
|
||||
private:
|
||||
// std::optional<RetType> ret_;
|
||||
const segcore::SegmentGrowing& segment_;
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
#include "query/generated/ExecPlanNodeVisitor.h"
|
||||
#include "segcore/SegmentGrowingImpl.h"
|
||||
#include "query/generated/ExecExprVisitor.h"
|
||||
#include "query/SearchOnGrowing.h"
|
||||
#include "query/Search.h"
|
||||
#include "query/SearchOnSealed.h"
|
||||
|
||||
namespace milvus::query {
|
||||
@ -45,11 +45,6 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
|
||||
return ret;
|
||||
}
|
||||
|
||||
private:
|
||||
template <typename VectorType>
|
||||
void
|
||||
VectorVisitorImpl(VectorPlanNode& node);
|
||||
|
||||
private:
|
||||
// std::optional<RetType> ret_;
|
||||
const segcore::SegmentGrowing& segment_;
|
||||
@ -61,16 +56,15 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
|
||||
} // namespace impl
|
||||
#endif
|
||||
|
||||
template <typename VectorType>
|
||||
void
|
||||
ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
|
||||
ExecPlanNodeVisitor::visit(FloatVectorANNS& node) {
|
||||
// TODO: optimize here, remove the dynamic cast
|
||||
assert(!ret_.has_value());
|
||||
auto segment = dynamic_cast<const segcore::SegmentGrowingImpl*>(&segment_);
|
||||
AssertInfo(segment, "support SegmentSmallIndex Only");
|
||||
RetType ret;
|
||||
auto& ph = placeholder_group_.at(0);
|
||||
auto src_data = ph.get_blob<EmbeddedType<VectorType>>();
|
||||
auto src_data = ph.get_blob<float>();
|
||||
auto num_queries = ph.num_of_queries_;
|
||||
|
||||
aligned_vector<uint8_t> bitset_holder;
|
||||
@ -86,20 +80,39 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
|
||||
SearchOnSealed(segment->get_schema(), sealed_indexing, node.query_info_, src_data, num_queries, timestamp_,
|
||||
view, ret);
|
||||
} else {
|
||||
SearchOnGrowing<VectorType>(*segment, node.query_info_, src_data, num_queries, timestamp_, view, ret);
|
||||
FloatSearch(*segment, node.query_info_, src_data, num_queries, timestamp_, view, ret);
|
||||
}
|
||||
|
||||
ret_ = ret;
|
||||
}
|
||||
|
||||
void
|
||||
ExecPlanNodeVisitor::visit(FloatVectorANNS& node) {
|
||||
VectorVisitorImpl<FloatVector>(node);
|
||||
}
|
||||
|
||||
void
|
||||
ExecPlanNodeVisitor::visit(BinaryVectorANNS& node) {
|
||||
VectorVisitorImpl<BinaryVector>(node);
|
||||
// TODO: optimize here, remove the dynamic cast
|
||||
assert(!ret_.has_value());
|
||||
auto segment = dynamic_cast<const segcore::SegmentGrowingImpl*>(&segment_);
|
||||
AssertInfo(segment, "support SegmentSmallIndex Only");
|
||||
RetType ret;
|
||||
auto& ph = placeholder_group_.at(0);
|
||||
auto src_data = ph.get_blob<uint8_t>();
|
||||
auto num_queries = ph.num_of_queries_;
|
||||
|
||||
aligned_vector<uint8_t> bitset_holder;
|
||||
BitsetView view;
|
||||
if (node.predicate_.has_value()) {
|
||||
ExecExprVisitor::RetType expr_ret = ExecExprVisitor(*segment).call_child(*node.predicate_.value());
|
||||
bitset_holder = AssembleNegBitmap(expr_ret);
|
||||
view = BitsetView(bitset_holder.data(), bitset_holder.size() * 8);
|
||||
}
|
||||
|
||||
auto& sealed_indexing = segment->get_sealed_indexing_record();
|
||||
if (sealed_indexing.is_ready(node.query_info_.field_offset_)) {
|
||||
SearchOnSealed(segment->get_schema(), sealed_indexing, node.query_info_, src_data, num_queries, timestamp_,
|
||||
view, ret);
|
||||
} else {
|
||||
BinarySearch(*segment, node.query_info_, src_data, num_queries, timestamp_, view, ret);
|
||||
}
|
||||
ret_ = ret;
|
||||
}
|
||||
|
||||
} // namespace milvus::query
|
||||
|
||||
@ -39,6 +39,7 @@ class ThreadSafeVector {
|
||||
if (size <= size_) {
|
||||
return;
|
||||
}
|
||||
// TODO: use multithread to speedup
|
||||
std::lock_guard lck(mutex_);
|
||||
while (vec_.size() < size) {
|
||||
vec_.emplace_back(std::forward<Args...>(args...));
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
namespace milvus::segcore {
|
||||
void
|
||||
VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) {
|
||||
// TODO
|
||||
|
||||
assert(field_meta_.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
auto dim = field_meta_.get_dim();
|
||||
|
||||
@ -29,6 +31,7 @@ VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const Vector
|
||||
for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) {
|
||||
const auto& chunk = source->get_chunk(chunk_id);
|
||||
// build index for chunk
|
||||
// TODO
|
||||
auto indexing = std::make_unique<knowhere::IVF>();
|
||||
auto dataset = knowhere::GenDataset(source->get_chunk_size(), dim, chunk.data());
|
||||
indexing->Train(dataset, conf);
|
||||
|
||||
@ -47,9 +47,6 @@ class IndexingEntry {
|
||||
return chunk_size_;
|
||||
}
|
||||
|
||||
virtual knowhere::Index*
|
||||
get_indexing(int64_t chunk_id) const = 0;
|
||||
|
||||
protected:
|
||||
// additional info
|
||||
const FieldMeta& field_meta_;
|
||||
@ -65,7 +62,7 @@ class ScalarIndexingEntry : public IndexingEntry {
|
||||
|
||||
// concurrent
|
||||
knowhere::scalar::StructuredIndex<T>*
|
||||
get_indexing(int64_t chunk_id) const override {
|
||||
get_indexing(int64_t chunk_id) const {
|
||||
Assert(!field_meta_.is_vector());
|
||||
return data_.at(chunk_id).get();
|
||||
}
|
||||
@ -83,7 +80,7 @@ class VecIndexingEntry : public IndexingEntry {
|
||||
|
||||
// concurrent
|
||||
knowhere::VecIndex*
|
||||
get_indexing(int64_t chunk_id) const override {
|
||||
get_vec_indexing(int64_t chunk_id) const {
|
||||
Assert(field_meta_.is_vector());
|
||||
return data_.at(chunk_id).get();
|
||||
}
|
||||
|
||||
@ -39,6 +39,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
int64_t
|
||||
PreInsert(int64_t size) override;
|
||||
|
||||
// TODO: originally, id should be put into data_chunk
|
||||
// TODO: Is it ok to put them the other side?
|
||||
Status
|
||||
Insert(int64_t reserved_offset,
|
||||
int64_t size,
|
||||
@ -93,22 +95,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
return *schema_;
|
||||
}
|
||||
|
||||
// return count of index that has index, i.e., [0, num_chunk_index) have built index
|
||||
int64_t
|
||||
num_chunk_index_safe(FieldOffset field_offset) const final {
|
||||
return indexing_record_.get_finished_ack();
|
||||
}
|
||||
|
||||
const knowhere::Index*
|
||||
chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const final {
|
||||
return indexing_record_.get_entry(field_offset).get_indexing(chunk_id);
|
||||
}
|
||||
|
||||
int64_t
|
||||
chunk_size() const final {
|
||||
return chunk_size_;
|
||||
}
|
||||
|
||||
public:
|
||||
ssize_t
|
||||
get_row_count() const override {
|
||||
|
||||
@ -14,8 +14,6 @@
|
||||
#include "common/Schema.h"
|
||||
#include "query/Plan.h"
|
||||
#include "common/Span.h"
|
||||
#include "IndexingEntry.h"
|
||||
#include <knowhere/index/vector_index/VecIndex.h>
|
||||
|
||||
namespace milvus::segcore {
|
||||
|
||||
@ -54,30 +52,10 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
return static_cast<Span<T>>(chunk_data_impl(field_offset, chunk_id));
|
||||
}
|
||||
|
||||
virtual int64_t
|
||||
num_chunk_index_safe(FieldOffset field_offset) const = 0;
|
||||
|
||||
template <typename T>
|
||||
const knowhere::scalar::StructuredIndex<T>&
|
||||
chunk_scalar_index(FieldOffset field_offset, int64_t chunk_id) const {
|
||||
static_assert(IsScalar<T>);
|
||||
using IndexType = knowhere::scalar::StructuredIndex<T>;
|
||||
auto base_ptr = chunk_index_impl(field_offset, chunk_id);
|
||||
auto ptr = dynamic_cast<const IndexType*>(base_ptr);
|
||||
AssertInfo(ptr, "entry mismatch");
|
||||
return *ptr;
|
||||
}
|
||||
|
||||
virtual int64_t
|
||||
chunk_size() const = 0;
|
||||
|
||||
protected:
|
||||
// blob and row_count
|
||||
virtual SpanBase
|
||||
chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const = 0;
|
||||
|
||||
virtual const knowhere::Index*
|
||||
chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const = 0;
|
||||
};
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
@ -19,6 +19,7 @@ NewCollection(const char* schema_proto_blob) {
|
||||
|
||||
auto collection = std::make_unique<milvus::segcore::Collection>(proto);
|
||||
|
||||
// TODO: delete print
|
||||
std::cout << "create collection " << collection->get_collection_name() << std::endl;
|
||||
|
||||
return (void*)collection.release();
|
||||
@ -28,8 +29,8 @@ void
|
||||
DeleteCollection(CCollection collection) {
|
||||
auto col = (milvus::segcore::Collection*)collection;
|
||||
|
||||
// TODO: delete print
|
||||
std::cout << "delete collection " << col->get_collection_name() << std::endl;
|
||||
|
||||
delete col;
|
||||
}
|
||||
|
||||
|
||||
@ -27,6 +27,7 @@ NewSegment(CCollection collection, uint64_t segment_id) {
|
||||
|
||||
auto segment = milvus::segcore::CreateGrowingSegment(col->get_schema());
|
||||
|
||||
// TODO: delete print
|
||||
std::cout << "create segment " << segment_id << std::endl;
|
||||
return (void*)segment.release();
|
||||
}
|
||||
@ -35,6 +36,7 @@ void
|
||||
DeleteSegment(CSegmentBase segment) {
|
||||
auto s = (milvus::segcore::SegmentGrowing*)segment;
|
||||
|
||||
// TODO: delete print
|
||||
std::cout << "delete segment " << std::endl;
|
||||
delete s;
|
||||
}
|
||||
@ -76,12 +78,17 @@ Insert(CSegmentBase c_segment,
|
||||
status.error_msg = strdup(e.what());
|
||||
return status;
|
||||
}
|
||||
|
||||
// TODO: delete print
|
||||
// std::cout << "do segment insert, sizeof_per_row = " << sizeof_per_row << std::endl;
|
||||
}
|
||||
|
||||
int64_t
|
||||
PreInsert(CSegmentBase c_segment, int64_t size) {
|
||||
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
|
||||
|
||||
// TODO: delete print
|
||||
// std::cout << "PreInsert segment " << std::endl;
|
||||
return segment->PreInsert(size);
|
||||
}
|
||||
|
||||
@ -109,6 +116,8 @@ int64_t
|
||||
PreDelete(CSegmentBase c_segment, int64_t size) {
|
||||
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
|
||||
|
||||
// TODO: delete print
|
||||
// std::cout << "PreDelete segment " << std::endl;
|
||||
return segment->PreDelete(size);
|
||||
}
|
||||
|
||||
|
||||
@ -5,11 +5,13 @@ option go_package = "github.com/zilliztech/milvus-distributed/internal/proto/mil
|
||||
|
||||
import "common.proto";
|
||||
import "internal.proto";
|
||||
import "schema.proto";
|
||||
|
||||
message CreateCollectionRequest {
|
||||
internal.MsgBase base = 1;
|
||||
string db_name = 2;
|
||||
string collectionName = 3;
|
||||
// `schema` is the serialized `schema.CollectionSchema`
|
||||
bytes schema = 4;
|
||||
}
|
||||
|
||||
@ -26,6 +28,11 @@ message HasCollectionRequest {
|
||||
string collection_name = 3;
|
||||
}
|
||||
|
||||
message BoolResponse {
|
||||
common.Status status = 1;
|
||||
bool value = 2;
|
||||
}
|
||||
|
||||
message DescribeCollectionRequest {
|
||||
internal.MsgBase base = 1;
|
||||
string db_name = 2;
|
||||
@ -33,7 +40,8 @@ message DescribeCollectionRequest {
|
||||
}
|
||||
|
||||
message DescribeCollectionResponse {
|
||||
repeated bytes schema = 1;
|
||||
common.Status status = 1;
|
||||
schema.CollectionSchema schema = 2;
|
||||
}
|
||||
|
||||
message LoadCollectionRequest {
|
||||
@ -56,6 +64,7 @@ message CollectionStatsRequest {
|
||||
|
||||
message CollectionStatsResponse {
|
||||
repeated common.KeyValuePair stats = 1;
|
||||
common.Status status = 2;
|
||||
}
|
||||
|
||||
|
||||
@ -66,6 +75,7 @@ message ShowCollectionRequest {
|
||||
|
||||
message ShowCollectionResponse {
|
||||
repeated string collection_names = 1;
|
||||
common.Status status = 2;
|
||||
}
|
||||
|
||||
|
||||
@ -114,6 +124,7 @@ message PartitionStatsRequest {
|
||||
|
||||
message PartitionStatsResponse {
|
||||
repeated common.KeyValuePair stats = 1;
|
||||
common.Status status = 2;
|
||||
}
|
||||
|
||||
message ShowPartitionRequest {
|
||||
@ -124,6 +135,7 @@ message ShowPartitionRequest {
|
||||
|
||||
message ShowPartitionResponse {
|
||||
repeated string partition_names = 1;
|
||||
common.Status status = 2;
|
||||
}
|
||||
|
||||
|
||||
@ -149,6 +161,7 @@ message IndexDescription {
|
||||
|
||||
message DescribeIndexResponse {
|
||||
repeated IndexDescription index_descriptions = 1;
|
||||
common.Status status = 2;
|
||||
}
|
||||
|
||||
message InsertRequest {
|
||||
@ -163,6 +176,24 @@ message InsertRequest {
|
||||
message InsertResponse {
|
||||
int64 rowID_begin = 1;
|
||||
int64 rowID_end = 2;
|
||||
common.Status status = 3;
|
||||
}
|
||||
|
||||
enum PlaceholderType {
|
||||
NONE = 0;
|
||||
VECTOR_BINARY = 100;
|
||||
VECTOR_FLOAT = 101;
|
||||
}
|
||||
|
||||
message PlaceholderValue {
|
||||
string tag = 1;
|
||||
PlaceholderType type = 2;
|
||||
// values is a 2d-array, every array contains a vector
|
||||
repeated bytes values = 3;
|
||||
}
|
||||
|
||||
message PlaceholderGroup {
|
||||
repeated PlaceholderValue placeholders = 1;
|
||||
}
|
||||
|
||||
message SearchRequest {
|
||||
@ -171,11 +202,49 @@ message SearchRequest {
|
||||
string collection_name = 3;
|
||||
repeated string partition_names = 4;
|
||||
string dsl = 5;
|
||||
repeated bytes placeholder_group = 6;
|
||||
// serialized `PlaceholderGroup`
|
||||
bytes placeholder_group = 6;
|
||||
}
|
||||
|
||||
message Hits {
|
||||
repeated int64 IDs = 1;
|
||||
repeated bytes row_data = 2;
|
||||
repeated float scores = 3;
|
||||
}
|
||||
|
||||
message QueryResult {
|
||||
common.Status status = 1;
|
||||
repeated bytes hits = 2;
|
||||
}
|
||||
|
||||
message FlushRequest {
|
||||
internal.MsgBase base = 1;
|
||||
string db_name = 2;
|
||||
string collection_name = 3;
|
||||
}
|
||||
}
|
||||
|
||||
service MilvusService {
|
||||
rpc CreateCollection(CreateCollectionRequest) returns (common.Status) {}
|
||||
rpc DropCollection(DropCollectionRequest) returns (common.Status) {}
|
||||
rpc HasCollection(HasCollectionRequest) returns (BoolResponse) {}
|
||||
rpc LoadCollection(LoadCollectionRequest) returns (common.Status) {}
|
||||
rpc ReleaseCollection(ReleaseCollectionRequest) returns (common.Status) {}
|
||||
rpc DescribeCollection(DescribeCollectionRequest) returns (DescribeCollectionResponse) {}
|
||||
rpc GetCollectionStatistics(CollectionStatsRequest) returns (CollectionStatsResponse) {}
|
||||
rpc ShowCollections(ShowCollectionRequest) returns (ShowCollectionResponse) {}
|
||||
|
||||
rpc CreatePartition(CreatePartitionRequest) returns (common.Status) {}
|
||||
rpc DropPartition(DropPartitionRequest) returns (common.Status) {}
|
||||
rpc HasPartition(HasPartitionRequest) returns (BoolResponse) {}
|
||||
rpc LoadPartitions(LoadPartitonRequest) returns (common.Status) {}
|
||||
rpc ReleasePartitions(ReleasePartitionRequest) returns (common.Status) {}
|
||||
rpc GetPartitionStatistics(PartitionStatsRequest) returns (PartitionStatsResponse) {}
|
||||
rpc ShowPartitions(ShowPartitionRequest) returns (ShowPartitionResponse) {}
|
||||
|
||||
rpc CreateIndex(CreateIndexRequest) returns (common.Status) {}
|
||||
rpc DescribeIndex(DescribeIndexRequest) returns (DescribeIndexResponse) {}
|
||||
|
||||
rpc Insert(InsertRequest) returns (InsertResponse) {}
|
||||
rpc Search(SearchRequest) returns (QueryResult) {}
|
||||
rpc Flush(FlushRequest) returns (common.Status) {}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
16
internal/proxyservice/interface.go
Normal file
16
internal/proxyservice/interface.go
Normal file
@ -0,0 +1,16 @@
|
||||
package proxyservice
|
||||
|
||||
import (
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
)
|
||||
|
||||
type ServiceBase = typeutil.Service
|
||||
|
||||
type Interface interface {
|
||||
ServiceBase
|
||||
RegisterLink() (proxypb.RegisterLinkResponse, error)
|
||||
RegisterNode(request proxypb.RegisterNodeRequest) (proxypb.RegisterNodeResponse, error)
|
||||
// TODO: i'm sure it's not a best way to keep consistency, fix me
|
||||
InvalidateCollectionMetaCache(request proxypb.InvalidateCollMetaCacheRequest) error
|
||||
}
|
||||
54
internal/proxyservice/proxyservice.go
Normal file
54
internal/proxyservice/proxyservice.go
Normal file
@ -0,0 +1,54 @@
|
||||
package proxyservice
|
||||
|
||||
import (
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
|
||||
)
|
||||
|
||||
type ProxyService struct {
|
||||
// implement Service
|
||||
|
||||
//nodeClients [] .Interface
|
||||
// factory method
|
||||
|
||||
}
|
||||
|
||||
func (s ProxyService) Init() {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s ProxyService) Start() {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s ProxyService) Stop() {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s ProxyService) GetServiceStates() (internalpb2.ServiceStates, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s ProxyService) GetTimeTickChannel() (string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s ProxyService) GetStatisticsChannel() (string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s ProxyService) RegisterLink() (proxypb.RegisterLinkResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s ProxyService) RegisterNode(request proxypb.RegisterNodeRequest) (proxypb.RegisterNodeResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s ProxyService) InvalidateCollectionMetaCache(request proxypb.InvalidateCollMetaCacheRequest) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func NewProxyServiceImpl() Interface {
|
||||
return &ProxyService{}
|
||||
}
|
||||
@ -83,14 +83,26 @@ type RocksMQ struct {
|
||||
//tsoTicker *time.Ticker
|
||||
}
|
||||
|
||||
func NewRocksMQ() *RocksMQ {
|
||||
mkv := memkv.NewMemoryKV()
|
||||
// mstore, _ :=
|
||||
rmq := &RocksMQ{
|
||||
// store: mstore,
|
||||
kv: mkv,
|
||||
func NewRocksMQ(name string) (*RocksMQ, error) {
|
||||
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
|
||||
bbto.SetBlockCache(gorocksdb.NewLRUCache(3 << 30))
|
||||
opts := gorocksdb.NewDefaultOptions()
|
||||
opts.SetBlockBasedTableFactory(bbto)
|
||||
opts.SetCreateIfMissing(true)
|
||||
opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(FixedChannelNameLen + 1))
|
||||
|
||||
db, err := gorocksdb.OpenDb(opts, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rmq
|
||||
|
||||
mkv := memkv.NewMemoryKV()
|
||||
|
||||
rmq := &RocksMQ{
|
||||
store: db,
|
||||
kv: mkv,
|
||||
}
|
||||
return rmq, nil
|
||||
}
|
||||
|
||||
//func (rmq *RocksMQ) startServerLoop(ctx context.Context) error {
|
||||
@ -217,7 +229,7 @@ func (rmq *RocksMQ) CreateConsumerGroup(groupName string, channelName string) er
|
||||
if rmq.checkKeyExist(key) {
|
||||
return errors.New("ConsumerGroup " + groupName + " already exists.")
|
||||
}
|
||||
err := rmq.kv.Save(key, "0")
|
||||
err := rmq.kv.Save(key, "-1")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -285,7 +297,60 @@ func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) erro
|
||||
}
|
||||
|
||||
func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) {
|
||||
return nil, nil
|
||||
metaKey := groupName + "/" + channelName + "/current_id"
|
||||
currentID, err := rmq.kv.Load(metaKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
readOpts.SetPrefixSameAsStart(true)
|
||||
iter := rmq.store.NewIterator(readOpts)
|
||||
defer iter.Close()
|
||||
|
||||
consumerMessage := make([]ConsumerMessage, 0, n)
|
||||
|
||||
fixChanName, err := fixChannelName(channelName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dataKey := fixChanName + "/" + currentID
|
||||
|
||||
// msgID is "-1" means this is the first consume operation
|
||||
if currentID == "-1" {
|
||||
iter.SeekToFirst()
|
||||
} else {
|
||||
iter.Seek([]byte(dataKey))
|
||||
}
|
||||
|
||||
offset := 0
|
||||
for ; iter.Valid() && offset < n; iter.Next() {
|
||||
key := iter.Key()
|
||||
val := iter.Value()
|
||||
offset++
|
||||
msgID, err := strconv.ParseInt(string(key.Data())[FixedChannelNameLen+1:], 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msg := ConsumerMessage{
|
||||
msgID: msgID,
|
||||
payload: val.Data(),
|
||||
}
|
||||
consumerMessage = append(consumerMessage, msg)
|
||||
key.Free()
|
||||
val.Free()
|
||||
}
|
||||
if err := iter.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newID := consumerMessage[len(consumerMessage)-1].msgID
|
||||
err = rmq.Seek(groupName, channelName, newID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return consumerMessage, nil
|
||||
}
|
||||
|
||||
func (rmq *RocksMQ) Seek(groupName string, channelName string, msgID UniqueID) error {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user