diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index 27ce7c37cf..a82c198df2 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -197,6 +197,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp)); mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i)); mut_msg.mutable_extra_params()->CopyFrom(request.extra_params()); + mut_msg.set_channel_id(channel_id); auto callback = [&stats, &msg_sended, this_thread](Result result, const pulsar::MessageId &messageId) { msg_sended += 1; @@ -204,7 +205,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result)); } }; - paralle_mut_producers_[this_thread]->sendAsync(mut_msg, callback); + paralle_mut_producers_[channel_id]->sendAsync(mut_msg, callback); } catch (const std::exception &e) { msg_sended += 1; diff --git a/proxy/src/message_client/Producer.cpp b/proxy/src/message_client/Producer.cpp index d49a13b2c7..ad5e0b3d2b 100644 --- a/proxy/src/message_client/Producer.cpp +++ b/proxy/src/message_client/Producer.cpp @@ -45,15 +45,11 @@ namespace message_client { } Result MsgProducer::send(milvus::grpc::InsertOrDeleteMsg &msg) { - int32_t channel_id = makeHash(std::to_string(msg.uid())) % 1024; - msg.set_channel_id(channel_id); auto msg_str = msg.SerializeAsString(); return send(msg_str, msg.uid()); } void MsgProducer::sendAsync(milvus::grpc::InsertOrDeleteMsg &msg, pulsar::SendCallback callback) { - int32_t channel_id = makeHash(std::to_string(msg.uid())) % 1024; - msg.set_channel_id(channel_id); auto msg_str = msg.SerializeAsString(); return sendAsync(msg_str, msg.uid(), callback); }