mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-02 00:45:30 +08:00
Add config for write node and query node
Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
parent
a2ae3044b0
commit
d8b1a7cf8d
33
conf/conf.go
33
conf/conf.go
@ -12,13 +12,11 @@ import (
|
||||
// yaml.MapSlice
|
||||
|
||||
type MasterConfig struct {
|
||||
PulsarURL string
|
||||
Address string
|
||||
Port int32
|
||||
PulsarMoniterInterval int32
|
||||
PulsarTopic string
|
||||
EtcdRootPath string
|
||||
SegmentThreshole float32
|
||||
DefaultGRPCPort string
|
||||
EtcdEndPoints []string
|
||||
}
|
||||
|
||||
type EtcdConfig struct {
|
||||
@ -41,8 +39,10 @@ type StorageConfig struct {
|
||||
}
|
||||
|
||||
type PulsarConfig struct {
|
||||
Address string
|
||||
Port int32
|
||||
Address string
|
||||
Port int32
|
||||
TopicNum int
|
||||
NodeNum int
|
||||
}
|
||||
|
||||
//type ProxyConfig struct {
|
||||
@ -51,12 +51,33 @@ type PulsarConfig struct {
|
||||
// Port int32
|
||||
//}
|
||||
|
||||
type Reader struct {
|
||||
ClientId int
|
||||
StopFlag int64
|
||||
ReaderQueueSize int
|
||||
SearchChanSize int
|
||||
Key2SegChanSize int
|
||||
InsertTopicStart int
|
||||
InsertTopicEnd int
|
||||
}
|
||||
|
||||
type Writer struct {
|
||||
ClientId int
|
||||
StopFlag int64
|
||||
ReaderQueueSize int
|
||||
SearchByIdChanSize int
|
||||
InsertTopicStart int
|
||||
InsertTopicEnd int
|
||||
}
|
||||
|
||||
type ServerConfig struct {
|
||||
Master MasterConfig
|
||||
Etcd EtcdConfig
|
||||
Timesync TimeSyncConfig
|
||||
Storage StorageConfig
|
||||
Pulsar PulsarConfig
|
||||
Writer Writer
|
||||
Reader Reader
|
||||
//Proxy ProxyConfig
|
||||
}
|
||||
|
||||
|
||||
@ -10,25 +10,23 @@
|
||||
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
master:
|
||||
pulsarurl: "pulsar://localhost:6650"
|
||||
adress: localhost
|
||||
port: 53100
|
||||
pulsarmoniterinterval: 1
|
||||
pulsartopic: "monitor-topic"
|
||||
etcdrootpath: "by-dev"
|
||||
segmentthreshole: 10000
|
||||
defaultgrpcport: ":53100"
|
||||
etcdendpoints: ["127.0.0.1:12379"]
|
||||
|
||||
etcd:
|
||||
address: localhost
|
||||
port: 2379
|
||||
port: 12379
|
||||
rootpath: by-dev
|
||||
segthreshold: 10000
|
||||
|
||||
timesync:
|
||||
interval: 10
|
||||
interval: 400
|
||||
|
||||
storage:
|
||||
driver: MinIO
|
||||
driver: TIKV
|
||||
address: localhost
|
||||
port: 0
|
||||
accesskey: ab
|
||||
@ -37,6 +35,25 @@ storage:
|
||||
pulsar:
|
||||
address: localhost
|
||||
port: 6650
|
||||
topicnum: 128
|
||||
nodenum: 1
|
||||
|
||||
reader:
|
||||
clientid: 1
|
||||
stopflag: -1
|
||||
readerqueuesize: 1024
|
||||
searchchansize: 10000
|
||||
key2segchansize: 10000
|
||||
inserttopicstart: 0
|
||||
inserttopicend: 128
|
||||
|
||||
writer:
|
||||
clientid: 1
|
||||
stopflag: -2
|
||||
readerqueuesize: 1024
|
||||
searchbyidchansize: 10000
|
||||
insertopicstart: 0
|
||||
inserttopicend: 128
|
||||
|
||||
proxy:
|
||||
timezone: UTC+8
|
||||
|
||||
@ -1,12 +1,10 @@
|
||||
package common
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
PULSAR_URL = "pulsar://localhost:6650"
|
||||
PULSAR_MONITER_INTERVAL = 1 * time.Second
|
||||
PULSAR_TOPIC = "monitor-topic"
|
||||
ETCD_ROOT_PATH = "by-dev"
|
||||
SEGMENT_THRESHOLE = 10000
|
||||
DEFAULT_GRPC_PORT = ":53100"
|
||||
)
|
||||
//const (
|
||||
// PULSAR_URL = "pulsar://localhost:6650"
|
||||
// PULSAR_MONITER_INTERVAL = 1 * time.Second
|
||||
// PULSAR_TOPIC = "monitor-topic"
|
||||
// ETCD_ROOT_PATH = "by-dev"
|
||||
// SEGMENT_THRESHOLE = 10000
|
||||
// DEFAULT_GRPC_PORT = ":53100"
|
||||
//)
|
||||
|
||||
@ -3,17 +3,22 @@ package informer
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/czs007/suvlim/conf"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
"github.com/czs007/suvlim/pkg/master/common"
|
||||
"github.com/czs007/suvlim/pkg/master/mock"
|
||||
)
|
||||
|
||||
func NewPulsarClient() PulsarClient {
|
||||
pulsarAddr := "pulsar://"
|
||||
pulsarAddr += conf.Config.Pulsar.Address
|
||||
pulsarAddr += ":"
|
||||
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
|
||||
client, err := pulsar.NewClient(pulsar.ClientOptions{
|
||||
URL: common.PULSAR_URL,
|
||||
URL: pulsarAddr,
|
||||
OperationTimeout: 30 * time.Second,
|
||||
ConnectionTimeout: 30 * time.Second,
|
||||
})
|
||||
@ -32,7 +37,7 @@ type PulsarClient struct {
|
||||
|
||||
func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error {
|
||||
consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{
|
||||
Topic: common.PULSAR_TOPIC,
|
||||
Topic: conf.Config.Master.PulsarTopic,
|
||||
SubscriptionName: "my-sub",
|
||||
Type: pulsar.Shared,
|
||||
})
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
|
||||
package mock
|
||||
|
||||
import (
|
||||
|
||||
@ -3,16 +3,21 @@ package mock
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/czs007/suvlim/conf"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
"github.com/czs007/suvlim/pkg/master/common"
|
||||
)
|
||||
|
||||
func FakePulsarProducer() {
|
||||
pulsarAddr := "pulsar://"
|
||||
pulsarAddr += conf.Config.Pulsar.Address
|
||||
pulsarAddr += ":"
|
||||
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
|
||||
client, err := pulsar.NewClient(pulsar.ClientOptions{
|
||||
URL: common.PULSAR_URL,
|
||||
URL: pulsarAddr,
|
||||
OperationTimeout: 30 * time.Second,
|
||||
ConnectionTimeout: 30 * time.Second,
|
||||
})
|
||||
@ -21,7 +26,7 @@ func FakePulsarProducer() {
|
||||
}
|
||||
|
||||
producer, err := client.CreateProducer(pulsar.ProducerOptions{
|
||||
Topic: common.PULSAR_TOPIC,
|
||||
Topic: conf.Config.Master.PulsarTopic,
|
||||
})
|
||||
testSegmentStats, _ := SegmentMarshal(SegmentStats{
|
||||
SegementID: uint64(1111),
|
||||
|
||||
@ -32,12 +32,15 @@ func Run() {
|
||||
}
|
||||
|
||||
func SegmentStatsController() {
|
||||
etcdAddr := conf.Config.Etcd.Address
|
||||
etcdAddr += ":"
|
||||
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
|
||||
cli, _ := clientv3.New(clientv3.Config{
|
||||
Endpoints: conf.Config.Master.EtcdEndPoints,
|
||||
Endpoints: []string{etcdAddr},
|
||||
DialTimeout: 5 * time.Second,
|
||||
})
|
||||
defer cli.Close()
|
||||
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath)
|
||||
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
|
||||
|
||||
ssChan := make(chan mock.SegmentStats, 10)
|
||||
defer close(ssChan)
|
||||
@ -138,7 +141,9 @@ func UpdateSegmentStatus(ss mock.SegmentStats, kvbase kv.Base) error {
|
||||
}
|
||||
|
||||
func GRPCServer(ch chan *messagepb.Mapping) error {
|
||||
lis, err := net.Listen("tcp", conf.Config.Master.DefaultGRPCPort)
|
||||
defaultGRPCPort := ":"
|
||||
defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10)
|
||||
lis, err := net.Listen("tcp", defaultGRPCPort)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -193,12 +198,15 @@ func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexP
|
||||
// }
|
||||
|
||||
func CollectionController(ch chan *messagepb.Mapping) {
|
||||
etcdAddr := conf.Config.Etcd.Address
|
||||
etcdAddr += ":"
|
||||
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
|
||||
cli, _ := clientv3.New(clientv3.Config{
|
||||
Endpoints: conf.Config.Master.EtcdEndPoints,
|
||||
Endpoints: []string{etcdAddr},
|
||||
DialTimeout: 5 * time.Second,
|
||||
})
|
||||
defer cli.Close()
|
||||
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath)
|
||||
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
|
||||
for collection := range ch {
|
||||
sID := id.New().Uint64()
|
||||
cID := id.New().Uint64()
|
||||
@ -238,12 +246,15 @@ func CollectionController(ch chan *messagepb.Mapping) {
|
||||
}
|
||||
|
||||
func WriteCollection2Datastore(collection *messagepb.Mapping) error {
|
||||
etcdAddr := conf.Config.Etcd.Address
|
||||
etcdAddr += ":"
|
||||
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
|
||||
cli, _ := clientv3.New(clientv3.Config{
|
||||
Endpoints: []string{"127.0.0.1:12379"},
|
||||
Endpoints: []string{etcdAddr},
|
||||
DialTimeout: 5 * time.Second,
|
||||
})
|
||||
defer cli.Close()
|
||||
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath)
|
||||
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
|
||||
sID := id.New().Uint64()
|
||||
cID := id.New().Uint64()
|
||||
fieldMetas := []*messagepb.FieldMeta{}
|
||||
@ -254,7 +265,7 @@ func WriteCollection2Datastore(collection *messagepb.Mapping) error {
|
||||
time.Now(), fieldMetas, []uint64{sID},
|
||||
[]string{"default"})
|
||||
cm := mock.GrpcMarshal(&c)
|
||||
s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 100, time.Now(), time.Unix(1<<36-1, 0))
|
||||
s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0))
|
||||
collectionData, err := mock.Collection2JSON(*cm)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@ -280,12 +291,15 @@ func WriteCollection2Datastore(collection *messagepb.Mapping) error {
|
||||
}
|
||||
|
||||
func UpdateCollectionIndex(index *messagepb.IndexParam) error {
|
||||
etcdAddr := conf.Config.Etcd.Address
|
||||
etcdAddr += ":"
|
||||
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
|
||||
cli, _ := clientv3.New(clientv3.Config{
|
||||
Endpoints: conf.Config.Master.EtcdEndPoints,
|
||||
Endpoints: []string{etcdAddr},
|
||||
DialTimeout: 5 * time.Second,
|
||||
})
|
||||
defer cli.Close()
|
||||
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath)
|
||||
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
|
||||
collectionName := index.CollectionName
|
||||
c, err := kvbase.Load("collection/" + collectionName)
|
||||
if err != nil {
|
||||
|
||||
@ -172,6 +172,7 @@ config_summary()
|
||||
|
||||
add_subdirectory( thirdparty )
|
||||
add_subdirectory( src )
|
||||
add_subdirectory(unittest)
|
||||
|
||||
# Unittest lib
|
||||
if ( BUILD_UNIT_TEST STREQUAL "ON" )
|
||||
|
||||
@ -88,11 +88,15 @@ ConfigMgr::ConfigMgr() {
|
||||
"localhost", nullptr, nullptr)},
|
||||
{"pulsar.port", CreateIntegerConfig("pulsar.port", false, 0, 65535, &config.pulsar.port.value,
|
||||
6650, nullptr, nullptr)},
|
||||
{"pulsar.topicnum", CreateIntegerConfig("pulsar.topicnum", false, 0, 1024, &config.pulsar.topicnum.value,
|
||||
1024, nullptr, nullptr)},
|
||||
{"pulsar.nodenum", CreateIntegerConfig("pulsar.nodenum", false, 0, 1024, &config.pulsar.nodenum.value,
|
||||
1, nullptr, nullptr)},
|
||||
/* master */
|
||||
{"master.address", CreateStringConfig("master.address", false, &config.master.address.value,
|
||||
"localhost", nullptr, nullptr)},
|
||||
{"master.port", CreateIntegerConfig("master.port", false, 0, 65535, &config.master.port.value,
|
||||
6000, nullptr, nullptr)},
|
||||
53100, nullptr, nullptr)},
|
||||
|
||||
/* etcd */
|
||||
{"etcd.address", CreateStringConfig("etcd.address", false, &config.etcd.address.value, "localhost", nullptr,
|
||||
|
||||
@ -75,11 +75,13 @@ struct ServerConfig {
|
||||
struct Pulsar{
|
||||
String address{"localhost"};
|
||||
Integer port{6650};
|
||||
Integer topicnum{1024};
|
||||
Integer nodenum{1};
|
||||
}pulsar;
|
||||
|
||||
struct Master{
|
||||
String address{"localhost"};
|
||||
Integer port{6000};
|
||||
Integer port{53100};
|
||||
}master;
|
||||
|
||||
struct Etcd{
|
||||
|
||||
@ -35,6 +35,7 @@ Status MsgClientV2::Init(const std::string &insert_delete,
|
||||
time_sync_producer_ = std::make_shared<MsgProducer>(pulsar_client, time_sync);
|
||||
|
||||
for (auto i = 0; i < mut_parallelism_; i++) {
|
||||
// std::string topic = insert_delete + "-" + std::to_string(i);
|
||||
paralle_mut_producers_.emplace_back(std::make_shared<MsgProducer>(pulsar_client,
|
||||
insert_delete,
|
||||
producerConfiguration));
|
||||
@ -162,8 +163,8 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
|
||||
auto row_count = request.rows_data_size();
|
||||
auto stats = std::vector<Status>(ParallelNum);
|
||||
std::atomic_uint64_t msg_sended = 0;
|
||||
|
||||
#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id, msg_sended), num_threads(ParallelNum)
|
||||
auto topic_num = config.pulsar.topicnum();
|
||||
#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id, msg_sended, topic_num), num_threads(ParallelNum)
|
||||
for (auto i = 0; i < row_count; i++) {
|
||||
milvus::grpc::InsertOrDeleteMsg mut_msg;
|
||||
int this_thread = omp_get_thread_num();
|
||||
@ -174,7 +175,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
|
||||
mut_msg.set_collection_name(request.collection_name());
|
||||
mut_msg.set_partition_tag(request.partition_tag());
|
||||
uint64_t uid = request.entity_id_array(i);
|
||||
auto channel_id = makeHash(&uid, sizeof(uint64_t)) % 1024;
|
||||
auto channel_id = makeHash(&uid, sizeof(uint64_t)) % topic_num;
|
||||
try {
|
||||
mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp));
|
||||
printf("%ld \n", mut_msg.segment_id());
|
||||
@ -190,6 +191,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
|
||||
paralle_mut_producers_[this_thread]->sendAsync(mut_msg, callback);
|
||||
}
|
||||
catch (const std::exception &e) {
|
||||
msg_sended += 1;
|
||||
stats[this_thread] = Status(DB_ERROR, e.what());
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,7 +46,6 @@ namespace message_client {
|
||||
|
||||
Result MsgProducer::send(milvus::grpc::InsertOrDeleteMsg &msg) {
|
||||
int32_t channel_id = makeHash(std::to_string(msg.uid())) % 1024;
|
||||
// std::cout << "partition id := " << channel_id <<std::endl;
|
||||
msg.set_channel_id(channel_id);
|
||||
auto msg_str = msg.SerializeAsString();
|
||||
return send(msg_str, msg.uid());
|
||||
|
||||
@ -52,8 +52,8 @@ MetaWrapper &MetaWrapper::GetInstance() {
|
||||
|
||||
Status MetaWrapper::Init() {
|
||||
try {
|
||||
etcd_root_path_ = config.etcd.rootpath();
|
||||
segment_path_ = (boost::filesystem::path(etcd_root_path_) / "segment/").string();
|
||||
etcd_root_path_ = config.etcd.rootpath() + "/";
|
||||
segment_path_ = (boost::filesystem::path(etcd_root_path_) / "segment/").string();
|
||||
collection_path_ = (boost::filesystem::path(etcd_root_path_) / "collection/").string();
|
||||
|
||||
auto master_addr = config.master.address() + ":" + std::to_string(config.master.port());
|
||||
@ -64,10 +64,10 @@ Status MetaWrapper::Init() {
|
||||
|
||||
// init etcd watcher
|
||||
auto f = [&](const etcdserverpb::WatchResponse &res) {
|
||||
UpdateMeta(res);
|
||||
UpdateMeta(res);
|
||||
};
|
||||
watcher_ = std::make_shared<milvus::master::Watcher>(etcd_addr, segment_path_, f, true);
|
||||
SyncMeta();
|
||||
return SyncMeta();
|
||||
}
|
||||
catch (const std::exception &e) {
|
||||
return Status(DB_ERROR, "Init meta error");
|
||||
@ -115,11 +115,10 @@ uint64_t MetaWrapper::AskSegmentId(const std::string &collection_name, uint64_t
|
||||
uint64_t open_ts = segment_info.open_timestamp();
|
||||
uint64_t close_ts = segment_info.close_timestamp();
|
||||
if (channel_id >= segment_info.channel_start() && channel_id < segment_info.channel_end()
|
||||
&& timestamp >= open_ts << 18 && timestamp < close_ts << 18
|
||||
&& segment_info.collection_name() == collection_name) {
|
||||
&& timestamp >= (open_ts << 18) && timestamp < (close_ts << 18)
|
||||
&& std::string(segment_info.collection_name()) == collection_name) {
|
||||
return segment_info.segment_id();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
throw std::runtime_error("Can't find eligible segment");
|
||||
}
|
||||
@ -166,8 +165,8 @@ Status MetaWrapper::SyncMeta() {
|
||||
int64_t MetaWrapper::CountCollection(const std::string &collection_name) {
|
||||
uint64_t count = 0;
|
||||
// TODO: index to speed up
|
||||
for (const auto& segment_info : segment_infos_){
|
||||
if (segment_info.second.collection_name() == collection_name){
|
||||
for (const auto &segment_info : segment_infos_) {
|
||||
if (segment_info.second.collection_name() == collection_name) {
|
||||
count += segment_info.second.rows();
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,8 +7,10 @@ set( GRPC_SERVICE_FILES
|
||||
|
||||
set(unittest_srcs
|
||||
unittest_entry.cpp
|
||||
# consumer_test.cpp producer_test.cpp
|
||||
get_result_test.cpp)
|
||||
consumer_test.cpp
|
||||
producer_test.cpp
|
||||
get_result_test.cpp
|
||||
test_pulsar.cpp)
|
||||
|
||||
|
||||
add_executable(test_pulsar ${unittest_srcs} ${GRPC_SERVICE_FILES})
|
||||
@ -32,6 +34,7 @@ target_link_libraries(test_pulsar
|
||||
grpc++
|
||||
pthread
|
||||
stdc++
|
||||
config
|
||||
)
|
||||
|
||||
install(TARGETS test_pulsar DESTINATION unittest)
|
||||
|
||||
@ -81,7 +81,7 @@ TEST(CLIENT_CPP, GetResult) {
|
||||
auto status_send = client_v2.SendQueryMessage(request, 10, query_id);
|
||||
|
||||
milvus::grpc::QueryResult result;
|
||||
auto status = client_v2.GetQueryResult(query_id, result);
|
||||
auto status = client_v2.GetQueryResult(query_id, &result);
|
||||
|
||||
std::cout << result.client_id() << std::endl;
|
||||
for (int k = 0; k < result.distances_size(); ++k) {
|
||||
|
||||
@ -1,69 +1,36 @@
|
||||
#include "thread"
|
||||
#include "pulsar/Client.h"
|
||||
#include <gtest/gtest.h>
|
||||
#include <omp.h>
|
||||
#include "message_client/Producer.h"
|
||||
#include "grpc/message.pb.h"
|
||||
|
||||
using namespace pulsar;
|
||||
using MyData = milvus::grpc::PMessage;
|
||||
|
||||
static const std::string exampleSchema =
|
||||
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
|
||||
"\"fields\":[{\"name\":\"id\",\"type\":\"string\"}, {\"name\":\"reason\",\"type\":\"string\"}]}";
|
||||
|
||||
int consumer() {
|
||||
|
||||
Client client("pulsar://localhost:6650");
|
||||
|
||||
ConsumerConfiguration consumerConf;
|
||||
Consumer consumer;
|
||||
consumerConf.setSchema(SchemaInfo(PROTOBUF, "Protobuf", exampleSchema));
|
||||
Result result = client.subscribe("topic-proto", "sub-2", consumerConf, consumer);
|
||||
|
||||
if (result != ResultOk) {
|
||||
std::cout << "Failed to subscribe: " << result << std::endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
Message msg;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
consumer.receive(msg);
|
||||
MyData data;
|
||||
data.ParseFromString(msg.getDataAsString());
|
||||
std::cout << " Received: " << msg
|
||||
<< " with payload '" << data.id() << " " << data.reason() << "'"
|
||||
<< std::endl;
|
||||
|
||||
consumer.acknowledge(msg);
|
||||
}
|
||||
|
||||
client.close();
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main() {
|
||||
Client client("pulsar://localhost:6650");
|
||||
|
||||
Producer producer;
|
||||
ProducerConfiguration producerConf;
|
||||
producerConf.setSchema(SchemaInfo(PROTOBUF, "Protobuf", exampleSchema));
|
||||
Result result = client.createProducer("pro", producerConf, producer);
|
||||
|
||||
if (result != ResultOk) {
|
||||
std::cout << "Error creating producer: " << result << std::endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// std::thread t(consumer);
|
||||
// Publish 10 messages to the topic
|
||||
for (int i = 0; i < 1; i++) {
|
||||
auto data = MyData();
|
||||
auto a = new milvus::grpc::A();
|
||||
a->set_a(9999);
|
||||
data.set_allocated_a(a);
|
||||
data.set_id("999");
|
||||
data.set_reason("*****test****");
|
||||
Message msg = MessageBuilder().setContent(data.SerializeAsString()).build();
|
||||
Result res = producer.send(msg);
|
||||
std::cout << " Message sent: " << res << std::endl;
|
||||
}
|
||||
client.close();
|
||||
// t.join();
|
||||
TEST(CLIENT_CPP, MultiTopic) {
|
||||
auto row_count = 10000;
|
||||
int ParallelNum = 128;
|
||||
uint64_t timestamp = 0;
|
||||
// TODO: Get the segment from master
|
||||
int64_t segment = 0;
|
||||
auto stats = std::vector<pulsar::Result>(ParallelNum);
|
||||
std::chrono::milliseconds start = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch());
|
||||
std::string blob = new char[2000];
|
||||
milvus::grpc::RowData data;
|
||||
data.set_blob(blob);
|
||||
#pragma omp parallel for default(none), shared(row_count, timestamp, segment, stats, data), num_threads(ParallelNum)
|
||||
for (auto i = 0; i < row_count; i++) {
|
||||
milvus::grpc::InsertOrDeleteMsg mut_msg;
|
||||
int this_thread = omp_get_thread_num();
|
||||
mut_msg.set_op(milvus::grpc::OpType::INSERT);
|
||||
mut_msg.set_uid(i);
|
||||
mut_msg.set_client_id(0);
|
||||
mut_msg.set_timestamp(timestamp);
|
||||
mut_msg.set_collection_name("collection0");
|
||||
mut_msg.set_partition_tag("partition0");
|
||||
mut_msg.set_segment_id(segment);
|
||||
// mut_msg.mutable_rows_data()->CopyFrom(&data);
|
||||
// auto result = paralle_mut_producers_[this_thread]->send(mut_msg);
|
||||
// if (result != pulsar::ResultOk) {
|
||||
// stats[this_thread] = result;
|
||||
// }
|
||||
}
|
||||
}
|
||||
@ -1,26 +1,16 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/czs007/suvlim/conf"
|
||||
reader "github.com/czs007/suvlim/reader/read_node"
|
||||
"sync"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func main() {
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
|
||||
numOfQueryNode := 2
|
||||
|
||||
go reader.StartQueryNode(pulsarURL, numOfQueryNode, 0)
|
||||
reader.StartQueryNode(pulsarURL, numOfQueryNode, 1)
|
||||
|
||||
|
||||
pulsarAddr := "pulsar://"
|
||||
pulsarAddr += conf.Config.Pulsar.Address
|
||||
pulsarAddr += ":"
|
||||
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
|
||||
reader.StartQueryNode(pulsarAddr)
|
||||
}
|
||||
|
||||
func main2() {
|
||||
wg := sync.WaitGroup{}
|
||||
//ctx, cancel := context.WithCancel(context.Background())
|
||||
//defer cancel()
|
||||
wg.Add(1)
|
||||
reader.StartQueryNode2()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
"github.com/czs007/suvlim/conf"
|
||||
masterPb "github.com/czs007/suvlim/pkg/master/grpc/master"
|
||||
msgpb "github.com/czs007/suvlim/pkg/master/grpc/message"
|
||||
timesync "github.com/czs007/suvlim/timesync"
|
||||
@ -35,7 +36,7 @@ type MessageClient struct {
|
||||
timestampBatchEnd uint64
|
||||
batchIDLen int
|
||||
|
||||
//
|
||||
//client id
|
||||
MessageClientID int
|
||||
}
|
||||
|
||||
@ -151,11 +152,10 @@ func (mc *MessageClient) createClient(url string) pulsar.Client {
|
||||
return client
|
||||
}
|
||||
|
||||
func (mc *MessageClient) InitClient(url string, numOfQueryNode int) {
|
||||
const ChannelNum = 1024
|
||||
|
||||
func (mc *MessageClient) InitClient(url string) {
|
||||
//create client
|
||||
mc.client = mc.createClient(url)
|
||||
mc.MessageClientID = conf.Config.Reader.ClientId
|
||||
|
||||
//create producer
|
||||
mc.searchResultProducer = mc.creatProducer("SearchResult")
|
||||
@ -166,25 +166,33 @@ func (mc *MessageClient) InitClient(url string, numOfQueryNode int) {
|
||||
mc.key2segConsumer = mc.createConsumer("Key2Seg")
|
||||
|
||||
// init channel
|
||||
mc.searchChan = make(chan *msgpb.SearchMsg, 10000)
|
||||
mc.key2SegChan = make(chan *msgpb.Key2SegMsg, 10000)
|
||||
mc.searchChan = make(chan *msgpb.SearchMsg, conf.Config.Reader.SearchChanSize)
|
||||
mc.key2SegChan = make(chan *msgpb.Key2SegMsg, conf.Config.Reader.Key2SegChanSize)
|
||||
|
||||
mc.InsertOrDeleteMsg = make([]*msgpb.InsertOrDeleteMsg, 0)
|
||||
mc.Key2SegMsg = make([]*msgpb.Key2SegMsg, 0)
|
||||
|
||||
//init timesync
|
||||
URL := "pulsar://localhost:6650"
|
||||
timeSyncTopic := "TimeSync"
|
||||
timeSyncSubName := "reader" + strconv.Itoa(mc.MessageClientID)
|
||||
readTopics := make([]string, 0, ChannelNum)
|
||||
for i := ChannelNum / numOfQueryNode * mc.MessageClientID; i < ChannelNum/numOfQueryNode*(mc.MessageClientID+1); i++ {
|
||||
readTopics := make([]string, 0)
|
||||
for i := conf.Config.Reader.InsertTopicStart; i < conf.Config.Reader.InsertTopicEnd; i++ {
|
||||
str := "InsertOrDelete-partition-"
|
||||
str = str + strconv.Itoa(i)
|
||||
readTopics = append(readTopics, str)
|
||||
}
|
||||
|
||||
readSubName := "reader" + strconv.Itoa(mc.MessageClientID)
|
||||
// TODO::read proxy conf from config.yaml
|
||||
proxyIdList := []int64{0}
|
||||
timeSync, err := timesync.NewReaderTimeSync(URL, timeSyncTopic, timeSyncSubName, readTopics, readSubName, proxyIdList, 400, -2, timesync.WithReaderQueueSize(ChannelNum))
|
||||
readerQueueSize := timesync.WithReaderQueueSize(conf.Config.Reader.ReaderQueueSize)
|
||||
timeSync, err := timesync.NewReaderTimeSync(timeSyncTopic,
|
||||
timeSyncSubName,
|
||||
readTopics,
|
||||
readSubName,
|
||||
proxyIdList,
|
||||
conf.Config.Reader.StopFlag,
|
||||
readerQueueSize)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@ -104,9 +104,9 @@ func (node *QueryNode) processSegmentCreate(id string, value string) {
|
||||
}
|
||||
printSegmentStruct(segment)
|
||||
|
||||
if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
|
||||
return
|
||||
}
|
||||
//if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
|
||||
// return
|
||||
//}
|
||||
|
||||
collection := node.GetCollectionByID(segment.CollectionID)
|
||||
if collection != nil {
|
||||
@ -147,9 +147,9 @@ func (node *QueryNode) processSegmentModify(id string, value string) {
|
||||
}
|
||||
printSegmentStruct(segment)
|
||||
|
||||
if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
|
||||
return
|
||||
}
|
||||
//if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
|
||||
// return
|
||||
//}
|
||||
|
||||
seg, err := node.GetSegmentBySegmentID(int64(segment.SegmentID)) // todo change to uint64
|
||||
if seg != nil {
|
||||
@ -269,7 +269,7 @@ func (node *QueryNode) InitFromMeta() error {
|
||||
}
|
||||
|
||||
func (node *QueryNode) RunMetaService(ctx context.Context, wg *sync.WaitGroup) {
|
||||
node.InitFromMeta()
|
||||
//node.InitFromMeta()
|
||||
metaChan := node.kvBase.WatchWithPrefix("")
|
||||
for {
|
||||
select {
|
||||
|
||||
@ -16,6 +16,7 @@ import "C"
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"log"
|
||||
"sort"
|
||||
"sync"
|
||||
@ -225,6 +226,8 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
|
||||
// TODO: get timeRange from message client
|
||||
var msgLen = node.PrepareBatchMsg()
|
||||
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
|
||||
assert.NotEqual(nil, 0, timeRange.timestampMin)
|
||||
assert.NotEqual(nil, 0, timeRange.timestampMax)
|
||||
|
||||
if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 {
|
||||
continue
|
||||
@ -232,12 +235,12 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
|
||||
|
||||
node.QueryNodeDataInit()
|
||||
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
|
||||
fmt.Println("MessagesPreprocess Done")
|
||||
//fmt.Println("MessagesPreprocess Done")
|
||||
node.WriterDelete()
|
||||
node.PreInsertAndDelete()
|
||||
fmt.Println("PreInsertAndDelete Done")
|
||||
//fmt.Println("PreInsertAndDelete Done")
|
||||
node.DoInsertAndDelete()
|
||||
fmt.Println("DoInsertAndDelete Done")
|
||||
//fmt.Println("DoInsertAndDelete Done")
|
||||
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
|
||||
}
|
||||
wg.Done()
|
||||
@ -337,7 +340,6 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr
|
||||
atomic.AddInt32(&node.deletePreprocessData.count, 1)
|
||||
}
|
||||
} else {
|
||||
fmt.Println("msg timestamp:= ", msg.Timestamp>>18)
|
||||
node.buffer.InsertDeleteBuffer = append(node.buffer.InsertDeleteBuffer, msg)
|
||||
node.buffer.validInsertDeleteBuffer = append(node.buffer.validInsertDeleteBuffer, true)
|
||||
}
|
||||
|
||||
@ -2,48 +2,28 @@ package reader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/czs007/suvlim/reader/message_client"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func StartQueryNode(pulsarURL string, numOfQueryNode int, messageClientID int) {
|
||||
if messageClientID >= numOfQueryNode {
|
||||
log.Printf("Illegal channel id")
|
||||
return
|
||||
}
|
||||
|
||||
mc := message_client.MessageClient{
|
||||
MessageClientID: messageClientID,
|
||||
}
|
||||
mc.InitClient(pulsarURL, numOfQueryNode)
|
||||
func StartQueryNode(pulsarURL string) {
|
||||
mc := message_client.MessageClient{}
|
||||
mc.InitClient(pulsarURL)
|
||||
|
||||
mc.ReceiveMessage()
|
||||
qn := CreateQueryNode(0, 0, &mc)
|
||||
qn.InitQueryNodeCollection()
|
||||
ctx := context.Background()
|
||||
|
||||
// Segments Services
|
||||
// go qn.SegmentManagementService()
|
||||
//go qn.SegmentManagementService()
|
||||
go qn.SegmentStatisticService()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
qn.InitFromMeta()
|
||||
wg.Add(3)
|
||||
go qn.RunMetaService(ctx, &wg)
|
||||
go qn.RunInsertDelete(&wg)
|
||||
go qn.RunSearch(&wg)
|
||||
wg.Wait()
|
||||
qn.Close()
|
||||
}
|
||||
|
||||
func StartQueryNode2() {
|
||||
ctx := context.Background()
|
||||
qn := CreateQueryNode(0, 0, nil)
|
||||
//qn.InitQueryNodeCollection()
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
//go qn.RunInsertDelete(&wg)
|
||||
//go qn.RunSearch(&wg)
|
||||
go qn.RunMetaService(ctx, &wg)
|
||||
wg.Wait()
|
||||
qn.Close()
|
||||
}
|
||||
|
||||
@ -1,15 +1,22 @@
|
||||
package reader
|
||||
|
||||
import (
|
||||
"github.com/czs007/suvlim/conf"
|
||||
"strconv"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestReader_startQueryNode(t *testing.T) {
|
||||
//pulsarURL := "pulsar://localhost:6650"
|
||||
pulsarAddr := "pulsar://"
|
||||
pulsarAddr += conf.Config.Pulsar.Address
|
||||
pulsarAddr += ":"
|
||||
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
|
||||
println(pulsarAddr)
|
||||
StartQueryNode(pulsarAddr)
|
||||
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
//go StartQueryNode(pulsarAddr, 0)
|
||||
//StartQueryNode(pulsarAddr, 1)
|
||||
|
||||
numOfQueryNode := 2
|
||||
|
||||
go StartQueryNode(pulsarURL, numOfQueryNode, 0)
|
||||
StartQueryNode(pulsarURL, numOfQueryNode, 1)
|
||||
}
|
||||
|
||||
@ -34,7 +34,7 @@ func (node *QueryNode) SegmentsManagement() {
|
||||
}
|
||||
|
||||
func (node *QueryNode) SegmentManagementService() {
|
||||
sleepMillisecondTime := 200
|
||||
sleepMillisecondTime := 1000
|
||||
fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms")
|
||||
for {
|
||||
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
|
||||
|
||||
@ -354,10 +354,11 @@ func TestSegment_GetMemSize(t *testing.T) {
|
||||
|
||||
func TestSegment_RealSchemaTest(t *testing.T) {
|
||||
// 1. Construct node, collection, partition and segment
|
||||
var schemaString = "id: 6873737669791618215\nname: \"collection0\"\nschema: \u003c\n " +
|
||||
"field_metas: \u003c\n field_name: \"age\"\n type: INT32\n dim: 1\n \u003e\n " +
|
||||
"field_metas: \u003c\n field_name: \"field_1\"\n type: VECTOR_FLOAT\n dim: 16\n \u003e\n" +
|
||||
"\u003e\ncreate_time: 1600416765\nsegment_ids: 6873737669791618215\npartition_tags: \"default\"\n"
|
||||
//var schemaString = "id: 6873737669791618215\nname: \"collection0\"\nschema: \u003c\n " +
|
||||
// "field_metas: \u003c\n field_name: \"age\"\n type: INT32\n dim: 1\n \u003e\n " +
|
||||
// "field_metas: \u003c\n field_name: \"field_1\"\n type: VECTOR_FLOAT\n dim: 16\n \u003e\n" +
|
||||
// "\u003e\ncreate_time: 1600416765\nsegment_ids: 6873737669791618215\npartition_tags: \"default\"\n"
|
||||
var schemaString = "id: 6875229265736357360\nname: \"collection0\"\nschema: \u003c\n field_metas: \u003c\n field_name: \"field_3\"\n type: INT32\n \u003e\n field_metas: \u003c\n field_name: \"field_vec\"\n type: VECTOR_FLOAT\n \u003e\n\u003e\ncreate_time: 1600764055\nsegment_ids: 6875229265736357360\npartition_tags: \"default\"\n"
|
||||
node := NewQueryNode(0, 0)
|
||||
var collection = node.NewCollection(0, "collection0", schemaString)
|
||||
var partition = collection.NewPartition("partition0")
|
||||
|
||||
@ -18,27 +18,30 @@ int main(int argc , char**argv) {
|
||||
client.Connect(connect_param);
|
||||
|
||||
milvus::Status stat;
|
||||
const std::string collectin_name = "collection1";
|
||||
const std::string collectin_name = "collection0";
|
||||
|
||||
// Create
|
||||
milvus::FieldPtr field_ptr1 = std::make_shared<milvus::Field>();
|
||||
milvus::FieldPtr field_ptr2 = std::make_shared<milvus::Field>();
|
||||
// milvus::FieldPtr field_ptr1 = std::make_shared<milvus::Field>();
|
||||
// milvus::FieldPtr field_ptr2 = std::make_shared<milvus::Field>();
|
||||
milvus::FieldPtr field_ptr3 = std::make_shared<milvus::Field>();
|
||||
milvus::FieldPtr field_ptr4 = std::make_shared<milvus::Field>();
|
||||
|
||||
field_ptr1->field_name = "field_1";
|
||||
field_ptr1->field_type = milvus::DataType::INT64;
|
||||
|
||||
field_ptr2->field_name = "field_2";
|
||||
field_ptr2->field_type = milvus::DataType::FLOAT;
|
||||
// field_ptr1->field_name = "field_1";
|
||||
// field_ptr1->field_type = milvus::DataType::INT64;
|
||||
//
|
||||
// field_ptr2->field_name = "field_2";
|
||||
// field_ptr2->field_type = milvus::DataType::FLOAT;
|
||||
|
||||
field_ptr3->field_name = "field_3";
|
||||
field_ptr3->field_type = milvus::DataType::INT32;
|
||||
field_ptr3->dim = 1;
|
||||
|
||||
field_ptr4->field_name = "field_vec";
|
||||
field_ptr4->field_type = milvus::DataType::VECTOR_FLOAT;
|
||||
field_ptr4->dim = 16;
|
||||
|
||||
milvus::Mapping mapping = {collectin_name, {field_ptr1, field_ptr2, field_ptr3, field_ptr4}};
|
||||
// milvus::Mapping mapping = {collectin_name, {field_ptr1, field_ptr2, field_ptr3, field_ptr4}};
|
||||
milvus::Mapping mapping = {collectin_name, {field_ptr3, field_ptr4}};
|
||||
|
||||
stat = client.CreateCollection(mapping, "test_extra_params");
|
||||
|
||||
|
||||
@ -50,7 +50,7 @@ int
|
||||
main(int argc, char* argv[]) {
|
||||
TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv);
|
||||
if (!parameters.is_valid){
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto client = milvus::ConnectionImpl();
|
||||
@ -66,7 +66,7 @@ main(int argc, char* argv[]) {
|
||||
|
||||
milvus_sdk::TimeRecorder insert("insert");
|
||||
for (int j = 0; j < LOOP; ++j) {
|
||||
auto status = client.Insert("collection1", "tag01", data, ids_array);
|
||||
auto status = client.Insert("collection0", "tag01", data, ids_array);
|
||||
if (!status.ok()){
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -54,7 +54,7 @@ int main(int argc , char**argv) {
|
||||
nlohmann::json vector_param_json;
|
||||
vector_param_json["num_queries"] = 1;
|
||||
vector_param_json["topK"] = TOP_K;
|
||||
vector_param_json["field_name"] = "fakevec";
|
||||
vector_param_json["field_name"] = "field_vec";
|
||||
std::string vector_param_json_string = vector_param_json.dump();
|
||||
|
||||
vectorParam.json_param = vector_param_json_string;
|
||||
@ -63,7 +63,7 @@ int main(int argc , char**argv) {
|
||||
milvus::TopKQueryResult result;
|
||||
|
||||
milvus_sdk::TimeRecorder test_search("search");
|
||||
auto status = client.Search("collection1", partition_list, "dsl", vectorParam, result);
|
||||
auto status = client.Search("collection0", partition_list, "dsl", vectorParam, result);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -4,10 +4,12 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
"github.com/czs007/suvlim/conf"
|
||||
pb "github.com/czs007/suvlim/pkg/master/grpc/message"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"log"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@ -39,10 +41,9 @@ type ReaderTimeSyncCfg struct {
|
||||
insertOrDeleteChan chan *pb.InsertOrDeleteMsg //output insert or delete msg
|
||||
|
||||
readStopFlagClientId int64
|
||||
interval int
|
||||
interval int64
|
||||
proxyIdList []int64
|
||||
readerQueueSize int
|
||||
TestData int
|
||||
|
||||
revTimesyncFromReader map[uint64]int
|
||||
|
||||
@ -62,16 +63,20 @@ func toMillisecond(ts *pb.TimeSyncMsg) int {
|
||||
}
|
||||
|
||||
func NewReaderTimeSync(
|
||||
pulsarAddr string,
|
||||
timeSyncTopic string,
|
||||
timeSyncSubName string,
|
||||
readTopics []string,
|
||||
readSubName string,
|
||||
proxyIdList []int64,
|
||||
interval int,
|
||||
readStopFlagClientId int64,
|
||||
opts ...ReaderTimeSyncOption,
|
||||
) (ReaderTimeSync, error) {
|
||||
pulsarAddr := "pulsar://"
|
||||
pulsarAddr += conf.Config.Pulsar.Address
|
||||
pulsarAddr += ":"
|
||||
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
|
||||
interval := int64(conf.Config.Timesync.Interval)
|
||||
|
||||
//check if proxyId has duplication
|
||||
if len(proxyIdList) == 0 {
|
||||
return nil, fmt.Errorf("proxy id list is empty")
|
||||
@ -189,7 +194,7 @@ func (r *ReaderTimeSyncCfg) alignTimeSync(ts []*pb.TimeSyncMsg) []*pb.TimeSyncMs
|
||||
curIdx := len(ts) - 1 - i
|
||||
preIdx := len(ts) - i
|
||||
timeGap := toMillisecond(ts[curIdx]) - toMillisecond(ts[preIdx])
|
||||
if timeGap >= (r.interval/2) || timeGap <= (-r.interval/2) {
|
||||
if int64(timeGap) >= (r.interval/2) || int64(timeGap) <= (-r.interval/2) {
|
||||
ts = ts[preIdx:]
|
||||
return ts
|
||||
}
|
||||
|
||||
@ -206,14 +206,12 @@ func TestAlignTimeSync5(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewReaderTimeSync(t *testing.T) {
|
||||
r, err := NewReaderTimeSync(pulsarAddr,
|
||||
timeSyncTopic,
|
||||
r, err := NewReaderTimeSync(timeSyncTopic,
|
||||
timeSyncSubName,
|
||||
[]string{readerTopic1, readerTopic2, readerTopic3, readerTopic4},
|
||||
readerSubName,
|
||||
[]int64{2, 1},
|
||||
interval,
|
||||
readStopFlag,
|
||||
WithReaderQueueSize(8),
|
||||
)
|
||||
if err != nil {
|
||||
@ -290,14 +288,12 @@ func TestPulsarClient(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReaderTimesync(t *testing.T) {
|
||||
r, err := NewReaderTimeSync(pulsarAddr,
|
||||
timeSyncTopic,
|
||||
r, err := NewReaderTimeSync(timeSyncTopic,
|
||||
timeSyncSubName,
|
||||
[]string{readerTopic1, readerTopic2, readerTopic3, readerTopic4},
|
||||
readerSubName,
|
||||
[]int64{2, 1},
|
||||
interval,
|
||||
readStopFlag,
|
||||
WithReaderQueueSize(1024),
|
||||
)
|
||||
if err != nil {
|
||||
@ -392,25 +388,21 @@ func TestReaderTimesync2(t *testing.T) {
|
||||
go startProxy(pt1, 1, pr1, 1, pr2, 2, 2*time.Second, t)
|
||||
go startProxy(pt2, 2, pr3, 3, pr4, 4, 2*time.Second, t)
|
||||
|
||||
r1, _ := NewReaderTimeSync(pulsarAddr,
|
||||
timeSyncTopic2,
|
||||
r1, _ := NewReaderTimeSync(timeSyncTopic2,
|
||||
timeSyncSubName1,
|
||||
[]string{readerTopic12, readerTopic22, readerTopic32, readerTopic42},
|
||||
readerSubName1,
|
||||
[]int64{2, 1},
|
||||
interval,
|
||||
readStopFlag1,
|
||||
WithReaderQueueSize(1024),
|
||||
)
|
||||
|
||||
r2, _ := NewReaderTimeSync(pulsarAddr,
|
||||
timeSyncTopic2,
|
||||
r2, _ := NewReaderTimeSync(timeSyncTopic2,
|
||||
timeSyncSubName2,
|
||||
[]string{readerTopic12, readerTopic22, readerTopic32, readerTopic42},
|
||||
readerSubName2,
|
||||
[]int64{2, 1},
|
||||
interval,
|
||||
readStopFlag2,
|
||||
WithReaderQueueSize(1024),
|
||||
)
|
||||
|
||||
@ -514,14 +506,12 @@ func TestReaderTimesync3(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
r, err := NewReaderTimeSync(pulsarAddr,
|
||||
timeSyncTopic3,
|
||||
r, err := NewReaderTimeSync(timeSyncTopic3,
|
||||
timeSyncSubName3,
|
||||
[]string{readerTopic13, readerTopic23, readerTopic33, readerTopic43},
|
||||
readerSubName3,
|
||||
[]int64{1},
|
||||
interval,
|
||||
readStopFlag3,
|
||||
WithReaderQueueSize(1024))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
@ -17,7 +17,6 @@ func main() {
|
||||
pulsarAddr += conf.Config.Pulsar.Address
|
||||
pulsarAddr += ":"
|
||||
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
|
||||
println(pulsarAddr)
|
||||
mc := message_client.MessageClient{}
|
||||
|
||||
mc.InitClient(pulsarAddr)
|
||||
@ -39,7 +38,6 @@ func main() {
|
||||
}
|
||||
|
||||
//TODO:: start a gorouter for searchById
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
|
||||
@ -3,6 +3,7 @@ package message_client
|
||||
import (
|
||||
"context"
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
"github.com/czs007/suvlim/conf"
|
||||
msgpb "github.com/czs007/suvlim/pkg/master/grpc/message"
|
||||
timesync "github.com/czs007/suvlim/timesync"
|
||||
"github.com/golang/protobuf/proto"
|
||||
@ -28,6 +29,9 @@ type MessageClient struct {
|
||||
timestampBatchStart uint64
|
||||
timestampBatchEnd uint64
|
||||
batchIDLen int
|
||||
|
||||
//client id
|
||||
MessageClientID int
|
||||
}
|
||||
|
||||
func (mc *MessageClient) Send(ctx context.Context, msg msgpb.Key2SegMsg) {
|
||||
@ -107,6 +111,7 @@ func (mc *MessageClient) createClient(url string) pulsar.Client {
|
||||
func (mc *MessageClient) InitClient(url string) {
|
||||
//create client
|
||||
mc.client = mc.createClient(url)
|
||||
mc.MessageClientID = conf.Config.Writer.ClientId
|
||||
|
||||
//create producer
|
||||
mc.key2segProducer = mc.creatProducer("Key2Seg")
|
||||
@ -115,25 +120,32 @@ func (mc *MessageClient) InitClient(url string) {
|
||||
mc.searchByIdConsumer = mc.createConsumer("SearchById")
|
||||
|
||||
//init channel
|
||||
mc.searchByIdChan = make(chan *msgpb.EntityIdentity, 10000)
|
||||
mc.searchByIdChan = make(chan *msgpb.EntityIdentity, conf.Config.Writer.SearchByIdChanSize)
|
||||
|
||||
//init msg slice
|
||||
mc.InsertMsg = make([]*msgpb.InsertOrDeleteMsg, 0)
|
||||
mc.DeleteMsg = make([]*msgpb.InsertOrDeleteMsg, 0)
|
||||
|
||||
//init timesync
|
||||
URL := "pulsar://localhost:6650"
|
||||
timeSyncTopic := "TimeSync"
|
||||
timeSyncSubName := "writer"
|
||||
readTopics := make([]string, 0, 1024)
|
||||
for i := 0; i < 1024; i++ {
|
||||
timeSyncSubName := "writer" + strconv.Itoa(mc.MessageClientID)
|
||||
readTopics := make([]string, 0)
|
||||
for i := conf.Config.Writer.InsertTopicStart; i < conf.Config.Writer.InsertTopicEnd; i++ {
|
||||
str := "InsertOrDelete-partition-"
|
||||
str = str + strconv.Itoa(i)
|
||||
readTopics = append(readTopics, str)
|
||||
}
|
||||
readSubName := "writer"
|
||||
readSubName := "writer" + strconv.Itoa(mc.MessageClientID)
|
||||
// TODO::read proxy conf from config.yaml
|
||||
proxyIdList := []int64{0}
|
||||
timeSync, err := timesync.NewReaderTimeSync(URL, timeSyncTopic, timeSyncSubName, readTopics, readSubName, proxyIdList, 400, -1, timesync.WithReaderQueueSize(1024))
|
||||
readerQueueSize := timesync.WithReaderQueueSize(conf.Config.Reader.ReaderQueueSize)
|
||||
timeSync, err := timesync.NewReaderTimeSync(timeSyncTopic,
|
||||
timeSyncSubName,
|
||||
readTopics,
|
||||
readSubName,
|
||||
proxyIdList,
|
||||
conf.Config.Writer.StopFlag,
|
||||
readerQueueSize)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@ -78,6 +78,7 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*msgpb.InsertOr
|
||||
prefixKeys = append(prefixKeys, []byte(prefixKey))
|
||||
timeStamps = append(timeStamps, uint64(data[i].Timestamp))
|
||||
segmentString, _ := (*wn.KvStore).GetSegments(ctx, []byte(prefixKey), uint64(data[i].Timestamp))
|
||||
|
||||
var segmentIds []int64
|
||||
for _, str := range segmentString {
|
||||
id, err := strconv.ParseInt(str, 10, 64)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user