From e704667bc55e93c3c990e814f69e948bf9020dd9 Mon Sep 17 00:00:00 2001 From: rain Date: Tue, 8 Sep 2020 16:57:48 +0800 Subject: [PATCH] Finish all feature and update the README of master Signed-off-by: rain --- cmd/master.go | 7 +- pkg/master/README.md | 10 +- pkg/master/common/config.go | 2 + pkg/master/grpc/master.pb.go | 235 ++++++++++++++++++ pkg/master/grpc/master.proto | 20 ++ pkg/master/kv/etcd_kv.go | 6 + pkg/master/kv/kv.go | 3 + pkg/master/mock/collection.go | 6 +- pkg/master/mock/grpc_client.go | 59 +++++ pkg/master/mock/grpc_client_test.go | 15 ++ pkg/master/mock/segment.go | 6 +- pkg/master/server.go | 73 +++++- proxy/src/message_client/ClientV2.cpp | 15 +- proxy/src/message_client/ClientV2.h | 4 +- proxy/src/server/delivery/ReqScheduler.cpp | 26 +- proxy/src/server/delivery/ReqScheduler.h | 13 +- .../delivery/request/DeleteEntityByIDReq.cpp | 2 +- .../delivery/request/DeleteEntityByIDReq.h | 5 +- .../src/server/delivery/request/InsertReq.cpp | 2 +- proxy/src/server/grpc_impl/GrpcServer.cpp | 3 +- proxy/src/server/tso/TSO.cpp | 4 +- 21 files changed, 469 insertions(+), 47 deletions(-) create mode 100644 pkg/master/grpc/master.pb.go create mode 100644 pkg/master/grpc/master.proto create mode 100644 pkg/master/mock/grpc_client.go create mode 100644 pkg/master/mock/grpc_client_test.go diff --git a/cmd/master.go b/cmd/master.go index 8af21a7b58..ebfaee66ac 100644 --- a/cmd/master.go +++ b/cmd/master.go @@ -1,8 +1,6 @@ package main -import ( - "github.com/czs007/suvlim/pkg/master" -) +import "github.com/czs007/suvlim/pkg/master" // func main() { // ctx, cancel := context.WithCancel(context.Background()) @@ -22,6 +20,7 @@ func init() { // go mock.FakePulsarProducer() } func main() { + master.Run() //master.SegmentStatsController() - master.CollectionController() + //master.CollectionController() } diff --git a/pkg/master/README.md b/pkg/master/README.md index 9daa0ed229..01c1caeb84 100644 --- a/pkg/master/README.md +++ b/pkg/master/README.md @@ -1,9 +1,13 @@ # How to start a master ## Requirements - - +### Start a etcdv3 +``` +./etcd -listen-peer-urls=http://192.168.1.10:12380 -advertise-client-urls=http://192.168.1.10:12379 -listen-client-urls http://0.0.0.0:12379,http://0.0.0.0:14001 -initial-advertise-peer-urls=http://192.168.1.10:12380 +``` ## Start from code - +``` +go run cmd/master.go +``` ## Start with docker diff --git a/pkg/master/common/config.go b/pkg/master/common/config.go index 774073215c..219d561d4c 100644 --- a/pkg/master/common/config.go +++ b/pkg/master/common/config.go @@ -7,4 +7,6 @@ const ( PULSAR_MONITER_INTERVAL = 1 * time.Second PULSAR_TOPIC = "monitor-topic" ETCD_ROOT_PATH = "by-dev" + SEGMENT_THRESHOLE = 10000 + DEFAULT_GRPC_PORT = ":53100" ) diff --git a/pkg/master/grpc/master.pb.go b/pkg/master/grpc/master.pb.go new file mode 100644 index 0000000000..4dcbdaa3a9 --- /dev/null +++ b/pkg/master/grpc/master.pb.go @@ -0,0 +1,235 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: master.proto + +//option go_package = "github.com/czs007/suvilm/pkg/master/grpc"; + +package masterpb + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type CreateCollectionRequest struct { + CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName,proto3" json:"collection_name,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CreateCollectionRequest) Reset() { *m = CreateCollectionRequest{} } +func (m *CreateCollectionRequest) String() string { return proto.CompactTextString(m) } +func (*CreateCollectionRequest) ProtoMessage() {} +func (*CreateCollectionRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f9c348dec43a6705, []int{0} +} + +func (m *CreateCollectionRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CreateCollectionRequest.Unmarshal(m, b) +} +func (m *CreateCollectionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CreateCollectionRequest.Marshal(b, m, deterministic) +} +func (m *CreateCollectionRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_CreateCollectionRequest.Merge(m, src) +} +func (m *CreateCollectionRequest) XXX_Size() int { + return xxx_messageInfo_CreateCollectionRequest.Size(m) +} +func (m *CreateCollectionRequest) XXX_DiscardUnknown() { + xxx_messageInfo_CreateCollectionRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_CreateCollectionRequest proto.InternalMessageInfo + +func (m *CreateCollectionRequest) GetCollectionName() string { + if m != nil { + return m.CollectionName + } + return "" +} + +type CreateCollectionResponse struct { + CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName,proto3" json:"collection_name,omitempty"` + CollectionId uint64 `protobuf:"varint,2,opt,name=collection_id,json=collectionId,proto3" json:"collection_id,omitempty"` + SegmentIds []uint64 `protobuf:"varint,3,rep,packed,name=segment_ids,json=segmentIds,proto3" json:"segment_ids,omitempty"` + PartitionTags []string `protobuf:"bytes,4,rep,name=partition_tags,json=partitionTags,proto3" json:"partition_tags,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CreateCollectionResponse) Reset() { *m = CreateCollectionResponse{} } +func (m *CreateCollectionResponse) String() string { return proto.CompactTextString(m) } +func (*CreateCollectionResponse) ProtoMessage() {} +func (*CreateCollectionResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f9c348dec43a6705, []int{1} +} + +func (m *CreateCollectionResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CreateCollectionResponse.Unmarshal(m, b) +} +func (m *CreateCollectionResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CreateCollectionResponse.Marshal(b, m, deterministic) +} +func (m *CreateCollectionResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_CreateCollectionResponse.Merge(m, src) +} +func (m *CreateCollectionResponse) XXX_Size() int { + return xxx_messageInfo_CreateCollectionResponse.Size(m) +} +func (m *CreateCollectionResponse) XXX_DiscardUnknown() { + xxx_messageInfo_CreateCollectionResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_CreateCollectionResponse proto.InternalMessageInfo + +func (m *CreateCollectionResponse) GetCollectionName() string { + if m != nil { + return m.CollectionName + } + return "" +} + +func (m *CreateCollectionResponse) GetCollectionId() uint64 { + if m != nil { + return m.CollectionId + } + return 0 +} + +func (m *CreateCollectionResponse) GetSegmentIds() []uint64 { + if m != nil { + return m.SegmentIds + } + return nil +} + +func (m *CreateCollectionResponse) GetPartitionTags() []string { + if m != nil { + return m.PartitionTags + } + return nil +} + +func init() { + proto.RegisterType((*CreateCollectionRequest)(nil), "masterpb.CreateCollectionRequest") + proto.RegisterType((*CreateCollectionResponse)(nil), "masterpb.CreateCollectionResponse") +} + +func init() { proto.RegisterFile("master.proto", fileDescriptor_f9c348dec43a6705) } + +var fileDescriptor_f9c348dec43a6705 = []byte{ + // 225 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0xcf, 0x4a, 0x03, 0x31, + 0x10, 0x87, 0x5d, 0x77, 0x29, 0x76, 0x6c, 0xab, 0xe4, 0x62, 0xf0, 0xe2, 0x1a, 0x11, 0x73, 0xda, + 0x83, 0xbe, 0x81, 0x3d, 0xf5, 0xa0, 0x87, 0xe0, 0xcd, 0x43, 0x49, 0x9b, 0x61, 0x09, 0x34, 0x7f, + 0xcc, 0x8c, 0xef, 0xe5, 0x23, 0x0a, 0x5b, 0x6d, 0x45, 0x51, 0x3c, 0xe6, 0xe3, 0xc7, 0x47, 0xe6, + 0x83, 0x49, 0xb0, 0xc4, 0x58, 0xba, 0x5c, 0x12, 0x27, 0x71, 0xb4, 0x7d, 0xe5, 0x95, 0xba, 0x87, + 0xb3, 0x79, 0x41, 0xcb, 0x38, 0x4f, 0x9b, 0x0d, 0xae, 0xd9, 0xa7, 0x68, 0xf0, 0xe5, 0x15, 0x89, + 0xc5, 0x0d, 0x9c, 0xac, 0x77, 0x70, 0x19, 0x6d, 0x40, 0x59, 0xb5, 0x95, 0x1e, 0x9b, 0xd9, 0x1e, + 0x3f, 0xda, 0x80, 0xea, 0xad, 0x02, 0xf9, 0x53, 0x42, 0x39, 0x45, 0xc2, 0x7f, 0x5b, 0xc4, 0x15, + 0x4c, 0xbf, 0x0c, 0xbd, 0x93, 0x87, 0x6d, 0xa5, 0x1b, 0x33, 0xd9, 0xc3, 0x85, 0x13, 0x17, 0x70, + 0x4c, 0xd8, 0x07, 0x8c, 0xbc, 0xf4, 0x8e, 0x64, 0xdd, 0xd6, 0xba, 0x31, 0xf0, 0x81, 0x16, 0x8e, + 0xc4, 0x35, 0xcc, 0xb2, 0x2d, 0xec, 0x07, 0x09, 0xdb, 0x9e, 0x64, 0xd3, 0xd6, 0x7a, 0x6c, 0xa6, + 0x3b, 0xfa, 0x64, 0x7b, 0xba, 0x45, 0x18, 0x3d, 0x0c, 0x09, 0xc4, 0x33, 0x9c, 0x7e, 0xff, 0xbb, + 0xb8, 0xec, 0x3e, 0xfb, 0x74, 0xbf, 0xc4, 0x39, 0x57, 0x7f, 0x4d, 0xb6, 0xa7, 0xab, 0x83, 0xd5, + 0x68, 0xc8, 0x7d, 0xf7, 0x1e, 0x00, 0x00, 0xff, 0xff, 0x0d, 0xb6, 0xf8, 0x4e, 0x7e, 0x01, 0x00, + 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// MasterClient is the client API for Master service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type MasterClient interface { + CreateCollection(ctx context.Context, in *CreateCollectionRequest, opts ...grpc.CallOption) (*CreateCollectionResponse, error) +} + +type masterClient struct { + cc *grpc.ClientConn +} + +func NewMasterClient(cc *grpc.ClientConn) MasterClient { + return &masterClient{cc} +} + +func (c *masterClient) CreateCollection(ctx context.Context, in *CreateCollectionRequest, opts ...grpc.CallOption) (*CreateCollectionResponse, error) { + out := new(CreateCollectionResponse) + err := c.cc.Invoke(ctx, "/masterpb.Master/CreateCollection", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// MasterServer is the server API for Master service. +type MasterServer interface { + CreateCollection(context.Context, *CreateCollectionRequest) (*CreateCollectionResponse, error) +} + +// UnimplementedMasterServer can be embedded to have forward compatible implementations. +type UnimplementedMasterServer struct { +} + +func (*UnimplementedMasterServer) CreateCollection(ctx context.Context, req *CreateCollectionRequest) (*CreateCollectionResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CreateCollection not implemented") +} + +func RegisterMasterServer(s *grpc.Server, srv MasterServer) { + s.RegisterService(&_Master_serviceDesc, srv) +} + +func _Master_CreateCollection_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CreateCollectionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MasterServer).CreateCollection(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/masterpb.Master/CreateCollection", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MasterServer).CreateCollection(ctx, req.(*CreateCollectionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Master_serviceDesc = grpc.ServiceDesc{ + ServiceName: "masterpb.Master", + HandlerType: (*MasterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "CreateCollection", + Handler: _Master_CreateCollection_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "master.proto", +} diff --git a/pkg/master/grpc/master.proto b/pkg/master/grpc/master.proto new file mode 100644 index 0000000000..961a93f90b --- /dev/null +++ b/pkg/master/grpc/master.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +//option go_package = "github.com/czs007/suvilm/pkg/master/grpc"; +package masterpb; + +service Master { + rpc CreateCollection (CreateCollectionRequest) returns (CreateCollectionResponse) {} +} + + +message CreateCollectionRequest { + string collection_name = 1; +} + +message CreateCollectionResponse { + string collection_name = 1; + uint64 collection_id = 2; + repeated uint64 segment_ids = 3; + repeated string partition_tags = 4; +} \ No newline at end of file diff --git a/pkg/master/kv/etcd_kv.go b/pkg/master/kv/etcd_kv.go index b45403484d..8b4d87305a 100644 --- a/pkg/master/kv/etcd_kv.go +++ b/pkg/master/kv/etcd_kv.go @@ -79,6 +79,12 @@ func (kv *etcdKVBase) Remove(key string) error { return nil } +func (kv *etcdKVBase) Watch(key string) clientv3.WatchChan { + key = path.Join(kv.rootPath, key) + rch := kv.client.Watch(context.Background(), key) + return rch +} + // SlowLogTxn wraps etcd transaction and log slow one. type SlowLogTxn struct { clientv3.Txn diff --git a/pkg/master/kv/kv.go b/pkg/master/kv/kv.go index 6a2ee82cf6..223fd2b9e2 100644 --- a/pkg/master/kv/kv.go +++ b/pkg/master/kv/kv.go @@ -1,7 +1,10 @@ package kv +import "go.etcd.io/etcd/clientv3" + type Base interface { Load(key string) (string, error) Save(key, value string) error Remove(key string) error + Watch(key string) clientv3.WatchChan } diff --git a/pkg/master/mock/collection.go b/pkg/master/mock/collection.go index 4bd9e07d5d..e375b6e1c6 100644 --- a/pkg/master/mock/collection.go +++ b/pkg/master/mock/collection.go @@ -36,11 +36,11 @@ func Collection2JSON(c Collection) (string, error) { return string(b), nil } -func JSON2Collection(s string) (Collection, error) { +func JSON2Collection(s string) (*Collection, error) { var c Collection err := json.Unmarshal([]byte(s), &c) if err != nil { - return Collection{}, err + return &Collection{}, err } - return c, nil + return &c, nil } diff --git a/pkg/master/mock/grpc_client.go b/pkg/master/mock/grpc_client.go new file mode 100644 index 0000000000..579b170315 --- /dev/null +++ b/pkg/master/mock/grpc_client.go @@ -0,0 +1,59 @@ +package mock + +import ( + "context" + "log" + "time" + + pb "github.com/czs007/suvlim/pkg/master/grpc" + "google.golang.org/grpc" +) + +// func main() { +// // Set up a connection to the server. +// conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) +// if err != nil { +// log.Fatalf("did not connect: %v", err) +// } +// defer conn.Close() +// c := pb.NewGreeterClient(conn) + +// // Contact the server and print out its response. +// name := defaultName +// if len(os.Args) > 1 { +// name = os.Args[1] +// } +// ctx, cancel := context.WithTimeout(context.Background(), time.Second) +// defer cancel() +// r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name}) +// if err != nil { +// log.Fatalf("could not greet: %v", err) +// } +// log.Printf("Greeting: %s", r.GetMessage()) +// } + +const ( + addr = "192.168.1.10:53100" +) + +func FakeCreateCollectionByGRPC() (string, uint64) { + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + defer conn.Close() + + c := pb.NewMasterClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + + defer cancel() + + r, err := c.CreateCollection(ctx, &pb.CreateCollectionRequest{CollectionName: "grpc-client-test"}) + if err != nil { + log.Fatalf("could not greet: %v", err) + } + + log.Printf("CreateCollection: %s, id: %d", r.GetCollectionName(), r.GetCollectionId()) + return r.GetCollectionName(), r.GetCollectionId() +} diff --git a/pkg/master/mock/grpc_client_test.go b/pkg/master/mock/grpc_client_test.go new file mode 100644 index 0000000000..4ba4385f18 --- /dev/null +++ b/pkg/master/mock/grpc_client_test.go @@ -0,0 +1,15 @@ +package mock + +import ( + "fmt" + "testing" +) + +func TestFakeCreateCollectionByGRPC(t *testing.T) { + collectionName, segmentID := FakeCreateCollectionByGRPC() + if collectionName != "grpc-client-test" { + t.Error("Collection name wrong") + } + fmt.Println(collectionName) + fmt.Println(segmentID) +} diff --git a/pkg/master/mock/segment.go b/pkg/master/mock/segment.go index bbb7da45c5..179a8dd24e 100644 --- a/pkg/master/mock/segment.go +++ b/pkg/master/mock/segment.go @@ -51,13 +51,13 @@ func Segment2JSON(s Segment) (string, error) { return string(b), nil } -func JSON2Segment(s string) (Segment, error) { +func JSON2Segment(s string) (*Segment, error) { var c Segment err := json.Unmarshal([]byte(s), &c) if err != nil { - return Segment{}, err + return &Segment{}, err } - return c, nil + return &c, nil } func FakeCreateSegment(id uint64, cl Collection, opentime time.Time, closetime time.Time) Segment { diff --git a/pkg/master/server.go b/pkg/master/server.go index d91ff6175b..c659748a44 100644 --- a/pkg/master/server.go +++ b/pkg/master/server.go @@ -1,19 +1,39 @@ package master import ( + "context" "fmt" "log" + "net" + "strconv" "time" "github.com/czs007/suvlim/pkg/master/common" + pb "github.com/czs007/suvlim/pkg/master/grpc" "github.com/czs007/suvlim/pkg/master/informer" "github.com/czs007/suvlim/pkg/master/kv" "github.com/czs007/suvlim/pkg/master/mock" - "go.etcd.io/etcd/clientv3" + "google.golang.org/grpc" ) +func Run() { + go mock.FakePulsarProducer() + go GRPCServer() + go SegmentStatsController() + go CollectionController() + for { + } +} + func SegmentStatsController() { + cli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{"127.0.0.1:12379"}, + DialTimeout: 5 * time.Second, + }) + defer cli.Close() + kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH) + ssChan := make(chan mock.SegmentStats, 10) defer close(ssChan) ssClient := informer.NewPulsarClient() @@ -21,7 +41,7 @@ func SegmentStatsController() { for { select { case ss := <-ssChan: - fmt.Println(ss) + ComputeCloseTime(ss, kvbase) case <-time.After(5 * time.Second): fmt.Println("timeout") return @@ -29,8 +49,51 @@ func SegmentStatsController() { } } -func GRPCServer() { +func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error { + if int(ss.MemorySize) > common.SEGMENT_THRESHOLE*0.8 { + memRate := int(ss.MemoryRate) + if memRate == 0 { + memRate = 1 + } + sec := common.SEGMENT_THRESHOLE * 0.2 / memRate + data, err := kvbase.Load(strconv.Itoa(int(ss.SegementID))) + if err != nil { + return err + } + seg, err := mock.JSON2Segment(data) + if err != nil { + return err + } + seg.CloseTimeStamp = time.Now().Add(time.Duration(sec) * time.Second) + updateData, err := mock.Segment2JSON(*seg) + if err != nil { + return err + } + kvbase.Save(strconv.Itoa(int(ss.SegementID)), updateData) + } + return nil +} +func GRPCServer() error { + lis, err := net.Listen("tcp", common.DEFAULT_GRPC_PORT) + if err != nil { + return err + } + s := grpc.NewServer() + pb.RegisterMasterServer(s, GRPCMasterServer{}) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + return err + } + return nil +} + +type GRPCMasterServer struct{} + +func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *pb.CreateCollectionRequest) (*pb.CreateCollectionResponse, error) { + return &pb.CreateCollectionResponse{ + CollectionName: in.CollectionName, + }, nil } func CollectionController() { @@ -56,7 +119,3 @@ func CollectionController() { log.Fatal(err) } } - -func Sync() { - -} diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index ffa3650de1..cd61c791e0 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -4,12 +4,6 @@ #include "utils/CommonUtil.h" #include "config/ServerConfig.h" -namespace { -int64_t gen_channe_id(int64_t uid) { - // TODO: murmur3 hash from pulsar source code - return 0; -} -} namespace milvus::message_client { @@ -142,7 +136,7 @@ milvus::grpc::QueryResult MsgClientV2::GetQueryResult(int64_t query_id) { return Aggregation(total_results[query_id]); } -Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request) { +Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, uint64_t timestamp) { // may have retry policy? auto row_count = request.rows_data_size(); // TODO: Get the segment from master @@ -152,8 +146,10 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request) { mut_msg.set_op(milvus::grpc::OpType::INSERT); mut_msg.set_uid(GetUniqueQId()); mut_msg.set_client_id(client_id_); - auto channel_id = gen_channe_id(request.entity_id_array(i)); + // TODO: add channel id + auto channel_id = 0; mut_msg.set_channel_id(channel_id); + mut_msg.set_timestamp(timestamp); mut_msg.set_collection_name(request.collection_name()); mut_msg.set_partition_tag(request.partition_tag()); mut_msg.set_segment_id(segment); @@ -169,7 +165,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request) { return Status::OK(); } -Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request) { +Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, uint64_t timestamp) { milvus::grpc::InsertOrDeleteMsg mut_msg; for (auto id: request.id_array()) { mut_msg.set_op(milvus::grpc::OpType::DELETE); @@ -177,6 +173,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request) mut_msg.set_client_id(client_id_); mut_msg.set_uid(id); mut_msg.set_collection_name(request.collection_name()); + mut_msg.set_timestamp(timestamp); auto result = insert_delete_producer_->send(mut_msg); if (result != pulsar::ResultOk) { diff --git a/proxy/src/message_client/ClientV2.h b/proxy/src/message_client/ClientV2.h index fcc6d8e58f..5e97565102 100644 --- a/proxy/src/message_client/ClientV2.h +++ b/proxy/src/message_client/ClientV2.h @@ -19,9 +19,9 @@ class MsgClientV2 { const std::string &search_result); // unpackage batch insert or delete request, and delivery message to pulsar per row - Status SendMutMessage(const milvus::grpc::InsertParam &request); + Status SendMutMessage(const milvus::grpc::InsertParam &request, uint64_t timestamp); - Status SendMutMessage(const milvus::grpc::DeleteByIDParam &request); + Status SendMutMessage(const milvus::grpc::DeleteByIDParam &request, uint64_t timestamp); // Status SendQueryMessage(const milvus::grpc::SearchParam &request); diff --git a/proxy/src/server/delivery/ReqScheduler.cpp b/proxy/src/server/delivery/ReqScheduler.cpp index 3789fcac78..eda02d2a0f 100644 --- a/proxy/src/server/delivery/ReqScheduler.cpp +++ b/proxy/src/server/delivery/ReqScheduler.cpp @@ -121,6 +121,11 @@ ReqScheduler::TakeToExecute(ReqQueuePtr req_queue) { } try { + if (req->type() == ReqType::kInsert || req->type() == ReqType::kDeleteEntityByID){ + std::lock_guard lock(time_syc_mtx_); + sending_ = true; + req->SetTimestamp(TSOracle::GetInstance().GetTimeStamp()); + } auto status = req->Execute(); if (!status.ok()) { LOG_SERVER_ERROR_ << "Req failed with code: " << status.ToString(); @@ -135,9 +140,6 @@ Status ReqScheduler::PutToQueue(const BaseReqPtr& req_ptr) { std::lock_guard lock(queue_mtx_); - auto &tso = TSOracle::GetInstance(); - req_ptr->SetTimestamp(tso.GetTimeStamp()); - std::string group_name = req_ptr->req_group(); if (req_groups_.count(group_name) > 0) { req_groups_[group_name]->PutReq(req_ptr); @@ -156,12 +158,24 @@ ReqScheduler::PutToQueue(const BaseReqPtr& req_ptr) { return Status::OK(); } -int64_t ReqScheduler::GetLatestReqDeliveredTime() { - return latest_req_time_.load(); +int64_t ReqScheduler::GetLatestDeliveredReqTime() { + std::lock_guard lock(time_syc_mtx_); + if (sending_){ + return latest_req_time_; + } + return TSOracle::GetInstance().GetTimeStamp(); } void ReqScheduler::UpdateLatestDeliveredReqTime(int64_t time) { - latest_req_time_.store(time); + std::lock_guard lock(time_syc_mtx_); + // update pulsar synchronous time only if message has been sent to pulsar + assert(sending_); + sending_ = false; + latest_req_time_ = time; +} + +uint64_t GetMessageTimeSyncTime(){ + return ReqScheduler::GetInstance().GetLatestDeliveredReqTime(); } } // namespace server diff --git a/proxy/src/server/delivery/ReqScheduler.h b/proxy/src/server/delivery/ReqScheduler.h index e6f00bb470..1f1e3d34b9 100644 --- a/proxy/src/server/delivery/ReqScheduler.h +++ b/proxy/src/server/delivery/ReqScheduler.h @@ -47,8 +47,8 @@ class ReqScheduler { void UpdateLatestDeliveredReqTime(int64_t time); - int64_t GetLatestReqDeliveredTime(); - + int64_t GetLatestDeliveredReqTime(); + protected: ReqScheduler(); @@ -63,8 +63,11 @@ class ReqScheduler { private: mutable std::mutex queue_mtx_; - - std::atomic latest_req_time_; + + // for time synchronous + std::mutex time_syc_mtx_; + int64_t latest_req_time_; + bool sending_; std::map req_groups_; @@ -73,5 +76,7 @@ class ReqScheduler { bool stopped_; }; +extern uint64_t GetMessageTimeSyncTime(); + } // namespace server } // namespace milvus diff --git a/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp b/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp index 2e7df2e2a6..1c599102b1 100644 --- a/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp +++ b/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp @@ -42,7 +42,7 @@ DeleteEntityByIDReq::Create(const ContextPtr& context, const ::milvus::grpc::Del Status DeleteEntityByIDReq::OnExecute() { auto &msg_client = message_client::MsgClientV2::GetInstance(); - Status status = msg_client.SendMutMessage(*request_); + Status status = msg_client.SendMutMessage(*request_, timestamp_); return status; } diff --git a/proxy/src/server/delivery/request/DeleteEntityByIDReq.h b/proxy/src/server/delivery/request/DeleteEntityByIDReq.h index 050e5adb71..3fbd1f5398 100644 --- a/proxy/src/server/delivery/request/DeleteEntityByIDReq.h +++ b/proxy/src/server/delivery/request/DeleteEntityByIDReq.h @@ -37,9 +37,12 @@ class DeleteEntityByIDReq : public BaseReq { Status OnExecute() override; + Status + OnPostExecute() override; + + private: const ::milvus::grpc::DeleteByIDParam *request_; - Status OnPostExecute(); }; } // namespace server diff --git a/proxy/src/server/delivery/request/InsertReq.cpp b/proxy/src/server/delivery/request/InsertReq.cpp index 0fd5f07a85..89cecd5f29 100644 --- a/proxy/src/server/delivery/request/InsertReq.cpp +++ b/proxy/src/server/delivery/request/InsertReq.cpp @@ -43,7 +43,7 @@ Status InsertReq::OnExecute() { LOG_SERVER_INFO_ << LogOut("[%s][%ld] ", "insert", 0) << "Execute InsertReq."; auto &msg_client = message_client::MsgClientV2::GetInstance(); - Status status = msg_client.SendMutMessage(*insert_param_); + Status status = msg_client.SendMutMessage(*insert_param_, timestamp_); return status; } diff --git a/proxy/src/server/grpc_impl/GrpcServer.cpp b/proxy/src/server/grpc_impl/GrpcServer.cpp index c1ca7867ac..697ec45288 100644 --- a/proxy/src/server/grpc_impl/GrpcServer.cpp +++ b/proxy/src/server/grpc_impl/GrpcServer.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include "GrpcRequestHandler.h" #include "config/ServerConfig.h" @@ -39,6 +40,7 @@ #include "server/grpc_impl/interceptor/SpanInterceptor.h" #include "utils/Log.h" #include "message_client/ClientV2.h" +#include "server/timesync/TimeSync.h" namespace milvus { namespace server { @@ -122,7 +124,6 @@ GrpcServer::StartService() { server_ptr_ = builder.BuildAndStart(); server_ptr_->Wait(); - return Status::OK(); } diff --git a/proxy/src/server/tso/TSO.cpp b/proxy/src/server/tso/TSO.cpp index 7e418e54d9..36b5bc65b7 100644 --- a/proxy/src/server/tso/TSO.cpp +++ b/proxy/src/server/tso/TSO.cpp @@ -24,8 +24,8 @@ uint64_t TSOracle::GetTimeStamp() { } uint64_t TSOracle::GetPhysical(const std::chrono::high_resolution_clock::time_point &t) { - auto nano_time = std::chrono::duration_cast(t.time_since_epoch()); - return nano_time / std::chrono::microseconds(1); + auto nano_time = std::chrono::duration_cast(t.time_since_epoch()); + return nano_time.count(); } uint64_t TSOracle::ComposeTs(uint64_t physical, uint64_t logical) {