mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-02 08:55:56 +08:00
Fix storage get segments
Signed-off-by: shengjh <1572099106@qq.com>
This commit is contained in:
parent
e704667bc5
commit
7becfe1b99
@ -8,9 +8,7 @@
|
||||
namespace milvus::message_client {
|
||||
|
||||
MsgClientV2 &MsgClientV2::GetInstance() {
|
||||
// TODO: do not hardcode pulsar message configure and init
|
||||
std::string pulsar_server_addr(std::string {"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port()));
|
||||
// "pulsar://localhost:6650"
|
||||
|
||||
int64_t client_id = 0;
|
||||
static MsgClientV2 msg_client(client_id, pulsar_server_addr);
|
||||
@ -144,11 +142,8 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, uin
|
||||
milvus::grpc::InsertOrDeleteMsg mut_msg;
|
||||
for (auto i = 0; i < row_count; i++) {
|
||||
mut_msg.set_op(milvus::grpc::OpType::INSERT);
|
||||
mut_msg.set_uid(GetUniqueQId());
|
||||
mut_msg.set_uid(request.entity_id_array(i));
|
||||
mut_msg.set_client_id(client_id_);
|
||||
// TODO: add channel id
|
||||
auto channel_id = 0;
|
||||
mut_msg.set_channel_id(channel_id);
|
||||
mut_msg.set_timestamp(timestamp);
|
||||
mut_msg.set_collection_name(request.collection_name());
|
||||
mut_msg.set_partition_tag(request.partition_tag());
|
||||
|
||||
@ -34,7 +34,7 @@ namespace message_client {
|
||||
|
||||
Result MsgProducer::send(milvus::grpc::InsertOrDeleteMsg &msg) {
|
||||
int32_t channel_id = makeHash(std::to_string(msg.uid())) % 1024;
|
||||
std::cout << "partition id := " << channel_id <<std::endl;
|
||||
// std::cout << "partition id := " << channel_id <<std::endl;
|
||||
msg.set_channel_id(channel_id);
|
||||
auto msg_str = msg.SerializeAsString();
|
||||
return send(msg_str, msg.uid());
|
||||
|
||||
@ -225,7 +225,9 @@ func (s *minioDriver) GetSegments(ctx context.Context, key Key, timestamp Timest
|
||||
if err != nil {
|
||||
panic("must no error")
|
||||
}
|
||||
segmentsSet[segment] = true
|
||||
if segment != "delete" {
|
||||
segmentsSet[segment] = true
|
||||
}
|
||||
}
|
||||
|
||||
var segments []string
|
||||
|
||||
@ -363,7 +363,7 @@ func (s *TikvStore) GetSegments(ctx context.Context, key Key, timestamp Timestam
|
||||
if err != nil {
|
||||
panic("must no error")
|
||||
}
|
||||
if ts <= timestamp {
|
||||
if ts <= timestamp && segment != string(DeleteMark){
|
||||
segmentsSet[segment] = true
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user