diff --git a/internal/allocator/id_allocator.go b/internal/allocator/id_allocator.go new file mode 100644 index 0000000000..c5cccaf85d --- /dev/null +++ b/internal/allocator/id_allocator.go @@ -0,0 +1,35 @@ +package allocator + +import ( + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" +) + +type IdAllocator struct { + +} + +func (allocator *IdAllocator) Initialize() error { + return nil +} + +func (allocator *IdAllocator) Start() error{ + return nil +} +func (allocator *IdAllocator) Close() error{ + return nil +} + +func (allocator *IdAllocator) AllocOne() typeutil.Id { + return 1 +} + +func (allocator *IdAllocator) Alloc(count uint32) ([]typeutil.Id, error){ + return make([]typeutil.Id, count), nil +} + + +func NewIdAllocator() *IdAllocator{ + return &IdAllocator{} +} + + diff --git a/internal/allocator/timestamp_allocator.go b/internal/allocator/timestamp_allocator.go new file mode 100644 index 0000000000..18f1b4e42e --- /dev/null +++ b/internal/allocator/timestamp_allocator.go @@ -0,0 +1,32 @@ +package allocator + +import ( + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" +) + +type TimestampAllocator struct {} + +func (allocator *TimestampAllocator) Start() error{ + return nil +} + +func (allocator *TimestampAllocator) Close() error{ + return nil +} + +func (allocator *TimestampAllocator) AllocOne() (typeutil.Timestamp, error){ + ret, err := allocator.Alloc(1) + if err != nil{ + return typeutil.ZeroTimestamp, err + } + return ret[0], nil +} + +func (allocator *TimestampAllocator) Alloc(count uint32) ([]typeutil.Timestamp, error){ + // to do lock and accuire more by grpc request + return make([]typeutil.Timestamp, count), nil +} + +func NewTimestampAllocator() *TimestampAllocator{ + return &TimestampAllocator{} +} diff --git a/internal/master/controller/collection.go b/internal/master/controller/collection.go index 055d51f54a..6c2ed08900 100644 --- a/internal/master/controller/collection.go +++ b/internal/master/controller/collection.go @@ -8,16 +8,18 @@ import ( "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/master/collection" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/master/id" + "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/master/segment" ) +var IdAllocator *allocator.IdAllocator = allocator.NewIdAllocator() + func CollectionController(ch chan *schemapb.CollectionSchema, kvbase kv.Base, errch chan error) { for collectionMeta := range ch { - sID := id.New().Int64() - cID := id.New().Int64() - s2ID := id.New().Int64() + sID := IdAllocator.AllocOne() + cID := IdAllocator.AllocOne() + s2ID := IdAllocator.AllocOne() fieldMetas := []*schemapb.FieldSchema{} if collectionMeta.Fields != nil { fieldMetas = collectionMeta.Fields @@ -53,8 +55,8 @@ func CollectionController(ch chan *schemapb.CollectionSchema, kvbase kv.Base, er } func WriteCollection2Datastore(collectionMeta *schemapb.CollectionSchema, kvbase kv.Base) error { - sID := id.New().Int64() - cID := id.New().Int64() + sID := IdAllocator.AllocOne() + cID := IdAllocator.AllocOne() fieldMetas := []*schemapb.FieldSchema{} if collectionMeta.Fields != nil { fieldMetas = collectionMeta.Fields diff --git a/internal/master/controller/segment.go b/internal/master/controller/segment.go index 2f97d2ec03..19fb6e4f98 100644 --- a/internal/master/controller/segment.go +++ b/internal/master/controller/segment.go @@ -6,10 +6,8 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/conf" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/master/collection" - "github.com/zilliztech/milvus-distributed/internal/master/id" - //"github.com/zilliztech/milvus-distributed/internal/master/informer" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/master/segment" ) @@ -40,7 +38,7 @@ func ComputeCloseTime(ss internalpb.SegmentStatistics, kvbase kv.Base) error { } kvbase.Save("segment/"+strconv.Itoa(int(ss.SegmentId)), updateData) //create new segment - newSegID := id.New().Int64() + newSegID := IdAllocator.AllocOne() newSeg := segment.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0)) newSegData, err := segment.Segment2JSON(*&newSeg) if err != nil { diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go index 049c5233ff..4d6ec4cfe2 100644 --- a/internal/master/grpc_service.go +++ b/internal/master/grpc_service.go @@ -367,7 +367,7 @@ func (s *Master) Tso(stream masterpb.Master_TsoServer) error { } response := &internalpb.TsoResponse{ Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, - Timestamp: &ts, + Timestamp: ts, Count: count, } if err := stream.Send(response); err != nil { diff --git a/internal/master/id/id.go b/internal/master/id/id.go index 0e991898f3..609dc7a716 100644 --- a/internal/master/id/id.go +++ b/internal/master/id/id.go @@ -1,42 +1,48 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package id import ( - "encoding/binary" - - "github.com/rs/xid" - - "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/master/tso" ) -type ID struct { - xid.ID + +// GlobalTSOAllocator is the global single point TSO allocator. +type GlobalIdAllocator struct { + allocator tso.Allocator } -func BytesToInt64(b []byte) (int64, error) { - if len(b) != 12 { - return 0, errors.Errorf("invalid data, must 12 bytes, but %d", len(b)) - } - - return int64(binary.BigEndian.Uint64(b)), nil -} - -// Uint64ToBytes converts uint64 to a byte slice. -func Uint64ToBytes(v uint64) []byte { - b := make([]byte, 12) - binary.BigEndian.PutUint64(b, v) - return b -} - -func New() ID { - return ID{ - xid.New(), +func NewGlobalIdAllocator() *GlobalIdAllocator { + return &GlobalIdAllocator{ + allocator: tso.NewGlobalTSOAllocator("idTimestamp"), } } -func (id ID) Int64() int64 { - b := id.Bytes() - if len(b) != 12 { - return 0 - } - return int64(binary.BigEndian.Uint64(b)) +// Initialize will initialize the created global TSO allocator. +func (gia *GlobalIdAllocator) Initialize() error { + return gia.allocator.Initialize() } + +// GenerateTSO is used to generate a given number of TSOs. +// Make sure you have initialized the TSO allocator before calling. +func (gia *GlobalIdAllocator) Generate(count uint32) (int64, int64, error) { + timestamp, err:= gia.allocator.GenerateTSO(count) + if err != nil{ + return 0, 0, err + } + idStart := int64(timestamp) + idEnd := idStart + int64(count) + return idStart, idEnd, nil +} + diff --git a/internal/master/master.go b/internal/master/master.go index 61ed237328..ebdaf9d646 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -93,7 +93,10 @@ func CreateServer(ctx context.Context) (*Master, error) { ssChan: make(chan internalpb.SegmentStatistics, 10), pc: informer.NewPulsarClient(), } - + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) + m.tsoAllocator = tso.NewGlobalTSOAllocator("timestamp") m.grpcServer = grpc.NewServer() masterpb.RegisterMasterServer(m.grpcServer, m) return m, nil diff --git a/internal/master/tso/global_allocator.go b/internal/master/tso/global_allocator.go index dbf05f0601..90d7b9b716 100644 --- a/internal/master/tso/global_allocator.go +++ b/internal/master/tso/global_allocator.go @@ -14,14 +14,16 @@ package tso import ( + "github.com/zilliztech/milvus-distributed/internal/conf" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "go.etcd.io/etcd/clientv3" + "strconv" "sync/atomic" "time" - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "github.com/pingcap/log" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.uber.org/zap" ) @@ -38,7 +40,7 @@ type Allocator interface { SetTSO(tso uint64) error // GenerateTSO is used to generate a given number of TSOs. // Make sure you have initialized the TSO allocator before calling. - GenerateTSO(count uint32) (internalpb.TimestampMsg, error) + GenerateTSO(count uint32) (uint64, error) // Reset is used to reset the TSO allocator. Reset() } @@ -49,13 +51,25 @@ type GlobalTSOAllocator struct { } // NewGlobalTSOAllocator creates a new global TSO allocator. -func NewGlobalTSOAllocator(client *clientv3.Client, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) Allocator { +func NewGlobalTSOAllocator(key string) Allocator { + + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) + + client, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{etcdAddr}, + DialTimeout: 5 * time.Second, + }) + + var saveInterval time.Duration = 3 *time.Second return &GlobalTSOAllocator{ timestampOracle: ×tampOracle{ client: client, - rootPath: rootPath, + rootPath: conf.Config.Etcd.Rootpath, saveInterval: saveInterval, - maxResetTSGap: maxResetTSGap, + maxResetTSGap: func() time.Duration { return 3 *time.Second}, + key: key, }, } } @@ -77,11 +91,10 @@ func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error { // GenerateTSO is used to generate a given number of TSOs. // Make sure you have initialized the TSO allocator before calling. -func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (internalpb.TimestampMsg, error) { - var resp internalpb.TimestampMsg - +func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { + var physical, logical int64 = 0, 0 if count == 0 { - return resp, errors.New("tso count should be positive") + return 0, errors.New("tso count should be positive") } maxRetryCount := 10 @@ -95,18 +108,17 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (internalpb.TimestampMs continue } - resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) - resp.Logical = atomic.AddInt64(¤t.logical, int64(count)) - if resp.Logical >= maxLogical { + physical = current.physical.UnixNano() / int64(time.Millisecond) + logical = atomic.AddInt64(¤t.logical, int64(count)) + if logical >= maxLogical { log.Error("logical part outside of max logical interval, please check ntp time", - zap.Reflect("response", resp), zap.Int("retry-count", i)) time.Sleep(UpdateTimestampStep) continue } - return resp, nil + return tsoutil.ComposeTS(physical, logical), nil } - return resp, errors.New("can not get timestamp") + return 0, errors.New("can not get timestamp") } // Reset is used to reset the TSO allocator. diff --git a/internal/master/tso/tso.go b/internal/master/tso/tso.go index 4211ee9357..88591962c8 100644 --- a/internal/master/tso/tso.go +++ b/internal/master/tso/tso.go @@ -49,6 +49,7 @@ type atomicObject struct { type timestampOracle struct { client *clientv3.Client rootPath string + key string // TODO: remove saveInterval saveInterval time.Duration maxResetTSGap func() time.Duration @@ -58,7 +59,7 @@ type timestampOracle struct { } func (t *timestampOracle) getTimestampPath() string { - return path.Join(t.rootPath, "timestamp") + return path.Join(t.rootPath, t.key) } func (t *timestampOracle) loadTimestamp() (time.Time, error) { diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index 85897f968d..29967bc084 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -3,7 +3,6 @@ package msgstream import ( "context" "github.com/apache/pulsar-client-go/pulsar" - "github.com/zilliztech/milvus-distributed/internal/msgclient" commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "log" "sync" @@ -201,7 +200,7 @@ func (ms *PulsarTtMsgStream) findTimeTick(ctx context.Context, (*ms.consumers[channelIndex]).Ack(pulsarMsg) tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload()) // TODO:: Find the EOF - if (*tsMsg).Type() == msgclient.kTimeTick { + if (*tsMsg).Type() == kTimeTick { eofMsgMap[channelIndex] = (*tsMsg).Ts() break } diff --git a/internal/proto/internal_msg.proto b/internal/proto/internal_msg.proto index 207de8ebc3..522676df38 100644 --- a/internal/proto/internal_msg.proto +++ b/internal/proto/internal_msg.proto @@ -30,7 +30,7 @@ enum ReqType { kSearch = 500; /* System Control */ - kTimeTick = 1200 + kTimeTick = 1200; } enum PeerRole { @@ -45,11 +45,6 @@ enum PeerRole { } -message TimestampMsg { - int64 physical = 1; - int64 logical = 2; -} - message TsoRequest { int64 peer_id = 1; PeerRole role = 2; @@ -58,7 +53,7 @@ message TsoRequest { message TsoResponse { common.Status status = 1; - TimestampMsg timestamp = 2; + uint64 timestamp = 2; uint32 count = 3; } diff --git a/internal/proto/internalpb/internal_msg.pb.go b/internal/proto/internalpb/internal_msg.pb.go index 16572e85b6..5f8df68889 100644 --- a/internal/proto/internalpb/internal_msg.pb.go +++ b/internal/proto/internalpb/internal_msg.pb.go @@ -40,24 +40,29 @@ const ( ReqType_kShowPartitions ReqType = 204 // Manipulation Requests ReqType_kInsert ReqType = 400 + ReqType_kDelete ReqType = 401 // Query ReqType_kSearch ReqType = 500 + // System Control + ReqType_kTimeTick ReqType = 1200 ) var ReqType_name = map[int32]string{ - 0: "kNone", - 100: "kCreateCollection", - 101: "kDropCollection", - 102: "kHasCollection", - 103: "kDescribeCollection", - 104: "kShowCollections", - 200: "kCreatePartition", - 201: "kDropPartition", - 202: "kHasPartition", - 203: "kDescribePartition", - 204: "kShowPartitions", - 400: "kInsert", - 500: "kSearch", + 0: "kNone", + 100: "kCreateCollection", + 101: "kDropCollection", + 102: "kHasCollection", + 103: "kDescribeCollection", + 104: "kShowCollections", + 200: "kCreatePartition", + 201: "kDropPartition", + 202: "kHasPartition", + 203: "kDescribePartition", + 204: "kShowPartitions", + 400: "kInsert", + 401: "kDelete", + 500: "kSearch", + 1200: "kTimeTick", } var ReqType_value = map[string]int32{ @@ -73,7 +78,9 @@ var ReqType_value = map[string]int32{ "kDescribePartition": 203, "kShowPartitions": 204, "kInsert": 400, + "kDelete": 401, "kSearch": 500, + "kTimeTick": 1200, } func (x ReqType) String() string { @@ -115,53 +122,6 @@ func (PeerRole) EnumDescriptor() ([]byte, []int) { return fileDescriptor_7eb37f6b80b23116, []int{1} } -type TimestampMsg struct { - Physical int64 `protobuf:"varint,1,opt,name=physical,proto3" json:"physical,omitempty"` - Logical int64 `protobuf:"varint,2,opt,name=logical,proto3" json:"logical,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *TimestampMsg) Reset() { *m = TimestampMsg{} } -func (m *TimestampMsg) String() string { return proto.CompactTextString(m) } -func (*TimestampMsg) ProtoMessage() {} -func (*TimestampMsg) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{0} -} - -func (m *TimestampMsg) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_TimestampMsg.Unmarshal(m, b) -} -func (m *TimestampMsg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_TimestampMsg.Marshal(b, m, deterministic) -} -func (m *TimestampMsg) XXX_Merge(src proto.Message) { - xxx_messageInfo_TimestampMsg.Merge(m, src) -} -func (m *TimestampMsg) XXX_Size() int { - return xxx_messageInfo_TimestampMsg.Size(m) -} -func (m *TimestampMsg) XXX_DiscardUnknown() { - xxx_messageInfo_TimestampMsg.DiscardUnknown(m) -} - -var xxx_messageInfo_TimestampMsg proto.InternalMessageInfo - -func (m *TimestampMsg) GetPhysical() int64 { - if m != nil { - return m.Physical - } - return 0 -} - -func (m *TimestampMsg) GetLogical() int64 { - if m != nil { - return m.Logical - } - return 0 -} - type TsoRequest struct { PeerId int64 `protobuf:"varint,1,opt,name=peer_id,json=peerId,proto3" json:"peer_id,omitempty"` Role PeerRole `protobuf:"varint,2,opt,name=role,proto3,enum=milvus.proto.internal.PeerRole" json:"role,omitempty"` @@ -175,7 +135,7 @@ func (m *TsoRequest) Reset() { *m = TsoRequest{} } func (m *TsoRequest) String() string { return proto.CompactTextString(m) } func (*TsoRequest) ProtoMessage() {} func (*TsoRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{1} + return fileDescriptor_7eb37f6b80b23116, []int{0} } func (m *TsoRequest) XXX_Unmarshal(b []byte) error { @@ -219,7 +179,7 @@ func (m *TsoRequest) GetCount() uint32 { type TsoResponse struct { Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` - Timestamp *TimestampMsg `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Count uint32 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` @@ -230,7 +190,7 @@ func (m *TsoResponse) Reset() { *m = TsoResponse{} } func (m *TsoResponse) String() string { return proto.CompactTextString(m) } func (*TsoResponse) ProtoMessage() {} func (*TsoResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{2} + return fileDescriptor_7eb37f6b80b23116, []int{1} } func (m *TsoResponse) XXX_Unmarshal(b []byte) error { @@ -258,11 +218,11 @@ func (m *TsoResponse) GetStatus() *commonpb.Status { return nil } -func (m *TsoResponse) GetTimestamp() *TimestampMsg { +func (m *TsoResponse) GetTimestamp() uint64 { if m != nil { return m.Timestamp } - return nil + return 0 } func (m *TsoResponse) GetCount() uint32 { @@ -287,7 +247,7 @@ func (m *CreateCollectionRequest) Reset() { *m = CreateCollectionRequest func (m *CreateCollectionRequest) String() string { return proto.CompactTextString(m) } func (*CreateCollectionRequest) ProtoMessage() {} func (*CreateCollectionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{3} + return fileDescriptor_7eb37f6b80b23116, []int{2} } func (m *CreateCollectionRequest) XXX_Unmarshal(b []byte) error { @@ -358,7 +318,7 @@ func (m *DropCollectionRequest) Reset() { *m = DropCollectionRequest{} } func (m *DropCollectionRequest) String() string { return proto.CompactTextString(m) } func (*DropCollectionRequest) ProtoMessage() {} func (*DropCollectionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{4} + return fileDescriptor_7eb37f6b80b23116, []int{3} } func (m *DropCollectionRequest) XXX_Unmarshal(b []byte) error { @@ -429,7 +389,7 @@ func (m *HasCollectionRequest) Reset() { *m = HasCollectionRequest{} } func (m *HasCollectionRequest) String() string { return proto.CompactTextString(m) } func (*HasCollectionRequest) ProtoMessage() {} func (*HasCollectionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{5} + return fileDescriptor_7eb37f6b80b23116, []int{4} } func (m *HasCollectionRequest) XXX_Unmarshal(b []byte) error { @@ -500,7 +460,7 @@ func (m *DescribeCollectionRequest) Reset() { *m = DescribeCollectionReq func (m *DescribeCollectionRequest) String() string { return proto.CompactTextString(m) } func (*DescribeCollectionRequest) ProtoMessage() {} func (*DescribeCollectionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{6} + return fileDescriptor_7eb37f6b80b23116, []int{5} } func (m *DescribeCollectionRequest) XXX_Unmarshal(b []byte) error { @@ -570,7 +530,7 @@ func (m *ShowCollectionRequest) Reset() { *m = ShowCollectionRequest{} } func (m *ShowCollectionRequest) String() string { return proto.CompactTextString(m) } func (*ShowCollectionRequest) ProtoMessage() {} func (*ShowCollectionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{7} + return fileDescriptor_7eb37f6b80b23116, []int{6} } func (m *ShowCollectionRequest) XXX_Unmarshal(b []byte) error { @@ -634,7 +594,7 @@ func (m *CreatePartitionRequest) Reset() { *m = CreatePartitionRequest{} func (m *CreatePartitionRequest) String() string { return proto.CompactTextString(m) } func (*CreatePartitionRequest) ProtoMessage() {} func (*CreatePartitionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{8} + return fileDescriptor_7eb37f6b80b23116, []int{7} } func (m *CreatePartitionRequest) XXX_Unmarshal(b []byte) error { @@ -705,7 +665,7 @@ func (m *DropPartitionRequest) Reset() { *m = DropPartitionRequest{} } func (m *DropPartitionRequest) String() string { return proto.CompactTextString(m) } func (*DropPartitionRequest) ProtoMessage() {} func (*DropPartitionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{9} + return fileDescriptor_7eb37f6b80b23116, []int{8} } func (m *DropPartitionRequest) XXX_Unmarshal(b []byte) error { @@ -776,7 +736,7 @@ func (m *HasPartitionRequest) Reset() { *m = HasPartitionRequest{} } func (m *HasPartitionRequest) String() string { return proto.CompactTextString(m) } func (*HasPartitionRequest) ProtoMessage() {} func (*HasPartitionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{10} + return fileDescriptor_7eb37f6b80b23116, []int{9} } func (m *HasPartitionRequest) XXX_Unmarshal(b []byte) error { @@ -847,7 +807,7 @@ func (m *DescribePartitionRequest) Reset() { *m = DescribePartitionReque func (m *DescribePartitionRequest) String() string { return proto.CompactTextString(m) } func (*DescribePartitionRequest) ProtoMessage() {} func (*DescribePartitionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{11} + return fileDescriptor_7eb37f6b80b23116, []int{10} } func (m *DescribePartitionRequest) XXX_Unmarshal(b []byte) error { @@ -918,7 +878,7 @@ func (m *ShowPartitionRequest) Reset() { *m = ShowPartitionRequest{} } func (m *ShowPartitionRequest) String() string { return proto.CompactTextString(m) } func (*ShowPartitionRequest) ProtoMessage() {} func (*ShowPartitionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{12} + return fileDescriptor_7eb37f6b80b23116, []int{11} } func (m *ShowPartitionRequest) XXX_Unmarshal(b []byte) error { @@ -994,7 +954,7 @@ func (m *InsertRequest) Reset() { *m = InsertRequest{} } func (m *InsertRequest) String() string { return proto.CompactTextString(m) } func (*InsertRequest) ProtoMessage() {} func (*InsertRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{13} + return fileDescriptor_7eb37f6b80b23116, []int{12} } func (m *InsertRequest) XXX_Unmarshal(b []byte) error { @@ -1102,7 +1062,7 @@ func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } func (m *DeleteRequest) String() string { return proto.CompactTextString(m) } func (*DeleteRequest) ProtoMessage() {} func (*DeleteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{14} + return fileDescriptor_7eb37f6b80b23116, []int{13} } func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { @@ -1188,7 +1148,7 @@ func (m *SearchRequest) Reset() { *m = SearchRequest{} } func (m *SearchRequest) String() string { return proto.CompactTextString(m) } func (*SearchRequest) ProtoMessage() {} func (*SearchRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{15} + return fileDescriptor_7eb37f6b80b23116, []int{14} } func (m *SearchRequest) XXX_Unmarshal(b []byte) error { @@ -1268,7 +1228,7 @@ func (m *SearchResult) Reset() { *m = SearchResult{} } func (m *SearchResult) String() string { return proto.CompactTextString(m) } func (*SearchResult) ProtoMessage() {} func (*SearchResult) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{16} + return fileDescriptor_7eb37f6b80b23116, []int{15} } func (m *SearchResult) XXX_Unmarshal(b []byte) error { @@ -1350,7 +1310,7 @@ func (m *TimeSyncMsg) Reset() { *m = TimeSyncMsg{} } func (m *TimeSyncMsg) String() string { return proto.CompactTextString(m) } func (*TimeSyncMsg) ProtoMessage() {} func (*TimeSyncMsg) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{17} + return fileDescriptor_7eb37f6b80b23116, []int{16} } func (m *TimeSyncMsg) XXX_Unmarshal(b []byte) error { @@ -1400,7 +1360,7 @@ func (m *Key2Seg) Reset() { *m = Key2Seg{} } func (m *Key2Seg) String() string { return proto.CompactTextString(m) } func (*Key2Seg) ProtoMessage() {} func (*Key2Seg) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{18} + return fileDescriptor_7eb37f6b80b23116, []int{17} } func (m *Key2Seg) XXX_Unmarshal(b []byte) error { @@ -1468,7 +1428,7 @@ func (m *Key2SegMsg) Reset() { *m = Key2SegMsg{} } func (m *Key2SegMsg) String() string { return proto.CompactTextString(m) } func (*Key2SegMsg) ProtoMessage() {} func (*Key2SegMsg) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{19} + return fileDescriptor_7eb37f6b80b23116, []int{18} } func (m *Key2SegMsg) XXX_Unmarshal(b []byte) error { @@ -1516,7 +1476,7 @@ func (m *SegmentStatistics) Reset() { *m = SegmentStatistics{} } func (m *SegmentStatistics) String() string { return proto.CompactTextString(m) } func (*SegmentStatistics) ProtoMessage() {} func (*SegmentStatistics) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{20} + return fileDescriptor_7eb37f6b80b23116, []int{19} } func (m *SegmentStatistics) XXX_Unmarshal(b []byte) error { @@ -1561,7 +1521,6 @@ func (m *SegmentStatistics) GetNumRows() int64 { func init() { proto.RegisterEnum("milvus.proto.internal.ReqType", ReqType_name, ReqType_value) proto.RegisterEnum("milvus.proto.internal.PeerRole", PeerRole_name, PeerRole_value) - proto.RegisterType((*TimestampMsg)(nil), "milvus.proto.internal.TimestampMsg") proto.RegisterType((*TsoRequest)(nil), "milvus.proto.internal.TsoRequest") proto.RegisterType((*TsoResponse)(nil), "milvus.proto.internal.TsoResponse") proto.RegisterType((*CreateCollectionRequest)(nil), "milvus.proto.internal.CreateCollectionRequest") @@ -1587,78 +1546,76 @@ func init() { func init() { proto.RegisterFile("internal_msg.proto", fileDescriptor_7eb37f6b80b23116) } var fileDescriptor_7eb37f6b80b23116 = []byte{ - // 1161 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x58, 0xcd, 0x6f, 0x1b, 0x45, - 0x14, 0xef, 0xda, 0xf1, 0x47, 0x9e, 0xe3, 0x64, 0x33, 0x89, 0x89, 0x1b, 0x0a, 0x0d, 0x5b, 0x24, - 0xaa, 0x4a, 0x38, 0xc2, 0xe5, 0x40, 0x8f, 0xb4, 0x3e, 0xd4, 0x54, 0xad, 0xaa, 0x75, 0x04, 0x12, - 0x12, 0x5a, 0xad, 0x77, 0x1f, 0xf6, 0x68, 0x3f, 0x66, 0x33, 0x33, 0x6e, 0xd8, 0xdc, 0xb9, 0x22, - 0x24, 0x8e, 0xdc, 0xf8, 0x6b, 0xf8, 0xba, 0xf3, 0x4f, 0x80, 0xa0, 0x12, 0x88, 0x2b, 0x9a, 0xd9, - 0x5d, 0xdb, 0xeb, 0x26, 0xe1, 0x43, 0x54, 0x8a, 0x94, 0x9b, 0xdf, 0x9b, 0xaf, 0xdf, 0xef, 0xf7, - 0xe6, 0xbd, 0x9d, 0x67, 0x20, 0x34, 0x96, 0xc8, 0x63, 0x37, 0x74, 0x22, 0x31, 0xe9, 0x25, 0x9c, - 0x49, 0x46, 0x3a, 0x11, 0x0d, 0x9f, 0xcd, 0x44, 0x66, 0xf5, 0x8a, 0x09, 0xfb, 0x1b, 0x1e, 0x8b, - 0x22, 0x16, 0x67, 0xee, 0xfd, 0x6d, 0x81, 0xfc, 0x19, 0xf5, 0x70, 0xb1, 0xce, 0x1a, 0xc0, 0xc6, - 0x11, 0x8d, 0x50, 0x48, 0x37, 0x4a, 0x1e, 0x8b, 0x09, 0xd9, 0x87, 0x66, 0x32, 0x4d, 0x05, 0xf5, - 0xdc, 0xb0, 0x6b, 0x1c, 0x18, 0xb7, 0xab, 0xf6, 0xdc, 0x26, 0x5d, 0x68, 0x84, 0x6c, 0xa2, 0x87, - 0x2a, 0x7a, 0xa8, 0x30, 0xad, 0x04, 0xe0, 0x48, 0x30, 0x1b, 0x8f, 0x67, 0x28, 0x24, 0xd9, 0x83, - 0x46, 0x82, 0xc8, 0x1d, 0xea, 0xe7, 0x5b, 0xd4, 0x95, 0x39, 0xf4, 0xc9, 0x5d, 0x58, 0xe3, 0x2c, - 0x44, 0xbd, 0x7a, 0xb3, 0x7f, 0xb3, 0x77, 0x26, 0xe6, 0xde, 0x53, 0x44, 0x6e, 0xb3, 0x10, 0x6d, - 0x3d, 0x99, 0xec, 0x42, 0xcd, 0x63, 0xb3, 0x58, 0x76, 0xab, 0x07, 0xc6, 0xed, 0xb6, 0x9d, 0x19, - 0xd6, 0xd7, 0x06, 0xb4, 0xf4, 0x91, 0x22, 0x61, 0xb1, 0x40, 0x72, 0x17, 0xea, 0x42, 0xba, 0x72, - 0x26, 0xf4, 0x91, 0xad, 0xfe, 0xab, 0xe5, 0xcd, 0x73, 0x19, 0x46, 0x7a, 0x8a, 0x9d, 0x4f, 0x25, - 0xef, 0xc3, 0xba, 0x2c, 0xc8, 0x6b, 0x50, 0xad, 0xfe, 0xad, 0x73, 0x40, 0x2d, 0x8b, 0x64, 0x2f, - 0x56, 0x9d, 0x83, 0xee, 0x27, 0x03, 0xf6, 0x1e, 0x70, 0x74, 0x25, 0x3e, 0x60, 0x61, 0x88, 0x9e, - 0xa4, 0x2c, 0x2e, 0xd4, 0xb9, 0x07, 0x4d, 0x8e, 0xc7, 0x8e, 0x4c, 0x13, 0xd4, 0x58, 0x37, 0xfb, - 0xaf, 0x9f, 0x73, 0xa6, 0x8d, 0xc7, 0x47, 0x69, 0x82, 0x76, 0x83, 0x67, 0x3f, 0x48, 0x07, 0xea, - 0x6a, 0x29, 0xf5, 0x73, 0xfd, 0x6b, 0x1c, 0x8f, 0x87, 0x3e, 0xb9, 0xb1, 0x4c, 0x43, 0xe1, 0x58, - 0x5b, 0x46, 0x78, 0x1d, 0x9a, 0x09, 0x67, 0x9f, 0xa5, 0x6a, 0xd9, 0x5a, 0x16, 0x36, 0x6d, 0x0f, - 0x7d, 0xf2, 0x0e, 0xd4, 0x85, 0x37, 0xc5, 0xc8, 0xed, 0xd6, 0x34, 0xf9, 0xeb, 0x67, 0x8a, 0x76, - 0x3f, 0x64, 0x63, 0x3b, 0x9f, 0x68, 0x3d, 0x37, 0xa0, 0x33, 0xe0, 0x2c, 0xb9, 0xd4, 0xbc, 0x1e, - 0xc3, 0x96, 0x37, 0xc7, 0xe7, 0xc4, 0x6e, 0x84, 0x39, 0xc1, 0x37, 0xcb, 0x88, 0xf2, 0x74, 0xe8, - 0x2d, 0xc8, 0x3c, 0x71, 0x23, 0xb4, 0x37, 0xbd, 0x92, 0x6d, 0xfd, 0x66, 0xc0, 0xee, 0x43, 0x57, - 0x5c, 0x25, 0xca, 0x7f, 0x18, 0x70, 0x7d, 0x80, 0xc2, 0xe3, 0x74, 0x8c, 0x57, 0x89, 0xf7, 0x37, - 0x06, 0x74, 0x46, 0x53, 0x76, 0x72, 0x99, 0x39, 0x5b, 0xbf, 0x1a, 0xf0, 0x4a, 0x56, 0x5d, 0x9e, - 0xba, 0x5c, 0xd2, 0x4b, 0x1a, 0x99, 0x0f, 0x60, 0x33, 0x29, 0xe0, 0x2d, 0x07, 0xe6, 0xd6, 0xd9, - 0x81, 0x99, 0x53, 0xd1, 0x71, 0x69, 0x27, 0xcb, 0xa6, 0xf5, 0x8b, 0x01, 0xbb, 0xaa, 0xea, 0x5c, - 0x15, 0xbe, 0x3f, 0x1b, 0xb0, 0xf3, 0xd0, 0x15, 0x57, 0x85, 0xee, 0x73, 0x03, 0xba, 0x45, 0xb5, - 0xb9, 0x2a, 0x9c, 0xd5, 0x47, 0x45, 0x55, 0x9a, 0xcb, 0xcc, 0xf7, 0xff, 0xfe, 0xa8, 0x54, 0xa0, - 0x3d, 0x8c, 0x05, 0x72, 0xf9, 0xf2, 0xb8, 0xbe, 0xf5, 0x22, 0x64, 0xc5, 0x78, 0x7d, 0x15, 0x0c, - 0xb9, 0x05, 0x8b, 0x80, 0x38, 0xd2, 0x9d, 0x68, 0xee, 0xeb, 0xf6, 0xc6, 0xdc, 0x79, 0xe4, 0x4e, - 0xc8, 0x6b, 0x00, 0x02, 0x27, 0x11, 0xc6, 0x52, 0x1d, 0x54, 0xd3, 0x07, 0xad, 0xe7, 0x9e, 0xa1, - 0xaf, 0x86, 0xbd, 0xa9, 0x1b, 0xc7, 0x18, 0xaa, 0xe1, 0x7a, 0x36, 0x9c, 0x7b, 0x86, 0x7e, 0x49, - 0xd9, 0x46, 0x59, 0xd9, 0x52, 0x48, 0x9a, 0xab, 0x21, 0xd9, 0x83, 0x06, 0x67, 0x27, 0x0e, 0xf5, - 0x45, 0x77, 0xfd, 0xa0, 0xaa, 0x1e, 0xd0, 0x9c, 0x9d, 0x0c, 0x7d, 0x41, 0xde, 0x85, 0xa6, 0x1a, - 0xf0, 0x5d, 0xe9, 0x76, 0xe1, 0xa0, 0x7a, 0xf1, 0x93, 0x4d, 0xed, 0x31, 0x70, 0xa5, 0x6b, 0x7d, - 0x5e, 0x81, 0xf6, 0x00, 0x43, 0x94, 0x78, 0x09, 0x74, 0x2f, 0x6b, 0xb6, 0x76, 0x91, 0x66, 0xb5, - 0x0b, 0x34, 0xab, 0xaf, 0x6a, 0xf6, 0x06, 0x6c, 0x24, 0x9c, 0x46, 0x2e, 0x4f, 0x9d, 0x00, 0x53, - 0xd1, 0x6d, 0x68, 0xe1, 0x5a, 0xb9, 0xef, 0x11, 0xa6, 0xc2, 0xfa, 0xd3, 0x80, 0xf6, 0x08, 0x5d, - 0xee, 0x4d, 0x5f, 0x9e, 0x0e, 0xcb, 0xf8, 0xab, 0x17, 0xe0, 0x5f, 0x5b, 0xc5, 0x7f, 0x07, 0xb6, - 0x39, 0x8a, 0x59, 0x28, 0x9d, 0x25, 0x79, 0x32, 0x05, 0xb6, 0xb2, 0x81, 0x07, 0x73, 0x91, 0x0e, - 0xa1, 0x76, 0x3c, 0x43, 0x9e, 0x6a, 0x15, 0x2e, 0xbc, 0x03, 0xd9, 0x3c, 0xeb, 0xab, 0x0a, 0x6c, - 0x14, 0xcc, 0xd5, 0x56, 0xff, 0xad, 0x5d, 0xfa, 0xf7, 0x94, 0x2d, 0x68, 0x6b, 0x00, 0x4e, 0xcc, - 0x7c, 0x5c, 0xc4, 0xbb, 0xa5, 0x9d, 0x4f, 0x98, 0x8f, 0xab, 0xb2, 0xd4, 0xfe, 0x91, 0x2c, 0xf5, - 0xb3, 0x65, 0xe9, 0xc1, 0xda, 0x94, 0xca, 0x2c, 0xf4, 0xad, 0xfe, 0xfe, 0xd9, 0x35, 0xea, 0x21, - 0x95, 0xc2, 0xd6, 0xf3, 0xac, 0x01, 0xb4, 0x54, 0x5b, 0x37, 0x4a, 0x63, 0x4f, 0xb5, 0xbe, 0xe7, - 0xb6, 0xad, 0x37, 0x56, 0xdb, 0xc4, 0x65, 0x84, 0xaa, 0x13, 0x6d, 0x3c, 0xc2, 0xb4, 0x3f, 0xc2, - 0x89, 0x56, 0x48, 0x27, 0x6e, 0xbe, 0x43, 0x4d, 0xe7, 0x2d, 0xb9, 0x09, 0xad, 0xa5, 0xbb, 0x99, - 0xab, 0x07, 0x8b, 0xab, 0xf9, 0xf7, 0x15, 0x9a, 0x0a, 0xe7, 0x99, 0x1b, 0xe6, 0x02, 0x36, 0xed, - 0x06, 0x15, 0x1f, 0x2a, 0x53, 0xed, 0xbc, 0x28, 0x50, 0xa2, 0x5b, 0xd3, 0x97, 0x1e, 0xe6, 0x15, - 0x4a, 0x58, 0x9f, 0x00, 0xe4, 0xe0, 0x14, 0xc5, 0x45, 0x04, 0x8d, 0xe5, 0x08, 0xbe, 0x07, 0x8d, - 0x00, 0xd3, 0xbe, 0xc0, 0x49, 0xb7, 0xa2, 0xb5, 0x3b, 0x2f, 0x0b, 0xf2, 0xad, 0xec, 0x62, 0xba, - 0x15, 0xc3, 0xf6, 0x28, 0x3b, 0x4c, 0xdd, 0x15, 0x2a, 0x24, 0xf5, 0xc4, 0x4a, 0xd5, 0x34, 0x56, - 0xab, 0xe6, 0x4d, 0x68, 0x45, 0x18, 0x31, 0x9e, 0x3a, 0x82, 0x9e, 0x62, 0xa1, 0x46, 0xe6, 0x1a, - 0xd1, 0x53, 0x54, 0x7c, 0xe3, 0x59, 0xe4, 0x70, 0x76, 0x22, 0x8a, 0x0b, 0x15, 0xcf, 0x22, 0x9b, - 0x9d, 0x88, 0x3b, 0x5f, 0x54, 0xa0, 0x91, 0xa7, 0x22, 0x59, 0x87, 0x5a, 0xf0, 0x84, 0xc5, 0x68, - 0x5e, 0x23, 0x1d, 0xd8, 0x0e, 0x56, 0xfb, 0x6d, 0xd3, 0x27, 0x3b, 0xb0, 0x15, 0x94, 0x9b, 0x55, - 0x13, 0x09, 0x81, 0xcd, 0xa0, 0xd4, 0xcd, 0x99, 0x9f, 0x92, 0x3d, 0xd8, 0x09, 0x5e, 0x6c, 0x77, - 0xcc, 0x09, 0xd9, 0x05, 0x33, 0x28, 0xf7, 0x03, 0xc2, 0x9c, 0x92, 0x0e, 0x98, 0xc1, 0xca, 0x03, - 0xdc, 0xfc, 0xd6, 0x20, 0x3b, 0xb0, 0x19, 0x94, 0x5e, 0xa9, 0xe6, 0x77, 0x06, 0x21, 0xd0, 0x0e, - 0x96, 0x9f, 0x72, 0xe6, 0xf7, 0x06, 0xd9, 0x03, 0x12, 0xbc, 0xf0, 0xde, 0x31, 0x7f, 0x30, 0xc8, - 0x2e, 0x6c, 0x05, 0xa5, 0x47, 0x81, 0x30, 0x7f, 0x34, 0xc8, 0x06, 0x34, 0x82, 0xec, 0xbb, 0x69, - 0x7e, 0x59, 0xd5, 0x56, 0x96, 0xcb, 0xe6, 0xef, 0xd5, 0x3b, 0xf7, 0xa0, 0x59, 0xfc, 0x5f, 0x42, - 0x00, 0xea, 0x8f, 0x5d, 0x21, 0x91, 0x9b, 0xd7, 0xd4, 0x6f, 0x1b, 0x5d, 0x1f, 0xb9, 0x69, 0xa8, - 0xdf, 0x1f, 0x71, 0xaa, 0xfc, 0x15, 0x25, 0xda, 0x53, 0x95, 0x9c, 0x66, 0xf5, 0xfe, 0xe0, 0xe3, - 0xfb, 0x13, 0x2a, 0xa7, 0xb3, 0xb1, 0x4a, 0xf7, 0xc3, 0x53, 0x1a, 0x86, 0xf4, 0x54, 0xa2, 0x37, - 0x3d, 0xcc, 0x62, 0xff, 0xb6, 0x4f, 0x85, 0xe4, 0x74, 0x3c, 0x93, 0xe8, 0x1f, 0x16, 0x37, 0xe0, - 0x50, 0x5f, 0x88, 0xb9, 0x99, 0x8c, 0xc7, 0x75, 0xed, 0xb9, 0xfb, 0x57, 0x00, 0x00, 0x00, 0xff, - 0xff, 0x62, 0x99, 0x85, 0x2d, 0x95, 0x12, 0x00, 0x00, + // 1135 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x58, 0x4b, 0x6f, 0x23, 0x45, + 0x10, 0xde, 0xf1, 0xdb, 0xe5, 0xd8, 0x99, 0x74, 0x62, 0xe2, 0x5d, 0x1e, 0x1b, 0x66, 0x91, 0x58, + 0xad, 0x84, 0x23, 0x1c, 0x0e, 0xec, 0x35, 0xf1, 0x21, 0x66, 0x95, 0x28, 0x1a, 0x47, 0x20, 0x21, + 0xa1, 0xd1, 0x78, 0xa6, 0xb0, 0x5b, 0xf3, 0x4c, 0x77, 0x3b, 0xc1, 0xb9, 0x73, 0x07, 0x71, 0xe4, + 0xc6, 0x99, 0x03, 0x3f, 0x83, 0xd7, 0x9d, 0x3f, 0x01, 0x82, 0x95, 0x40, 0x5c, 0x51, 0xf7, 0x8c, + 0x1f, 0xe3, 0x24, 0xe6, 0x21, 0x56, 0x8a, 0x94, 0x9b, 0xab, 0xfa, 0xf5, 0x7d, 0x5f, 0x55, 0xd7, + 0x74, 0x19, 0x08, 0x0d, 0x05, 0xb2, 0xd0, 0xf6, 0xad, 0x80, 0x0f, 0xdb, 0x31, 0x8b, 0x44, 0x44, + 0x9a, 0x01, 0xf5, 0xcf, 0xc7, 0x3c, 0xb1, 0xda, 0xd3, 0x09, 0x0f, 0xd6, 0x9c, 0x28, 0x08, 0xa2, + 0x30, 0x71, 0x3f, 0xd8, 0xe0, 0xc8, 0xce, 0xa9, 0x83, 0xf3, 0x75, 0x46, 0x0c, 0x70, 0xca, 0x23, + 0x13, 0xcf, 0xc6, 0xc8, 0x05, 0xd9, 0x86, 0x72, 0x8c, 0xc8, 0x2c, 0xea, 0xb6, 0xb4, 0x1d, 0xed, + 0x71, 0xde, 0x2c, 0x49, 0xb3, 0xe7, 0x92, 0x3d, 0x28, 0xb0, 0xc8, 0xc7, 0x56, 0x6e, 0x47, 0x7b, + 0xdc, 0xe8, 0x3c, 0x6c, 0x5f, 0x7b, 0x5a, 0xfb, 0x04, 0x91, 0x99, 0x91, 0x8f, 0xa6, 0x9a, 0x4c, + 0xb6, 0xa0, 0xe8, 0x44, 0xe3, 0x50, 0xb4, 0xf2, 0x3b, 0xda, 0xe3, 0xba, 0x99, 0x18, 0xc6, 0x39, + 0xd4, 0xd4, 0x89, 0x3c, 0x8e, 0x42, 0x8e, 0x64, 0x0f, 0x4a, 0x5c, 0xd8, 0x62, 0xcc, 0xd5, 0x89, + 0xb5, 0xce, 0xcb, 0xd9, 0xbd, 0x53, 0xfc, 0x7d, 0x35, 0xc5, 0x4c, 0xa7, 0x92, 0x57, 0xa0, 0x2a, + 0x68, 0x80, 0x5c, 0xd8, 0x41, 0xac, 0x30, 0x15, 0xcc, 0xb9, 0xe3, 0x86, 0x73, 0x7f, 0xd2, 0x60, + 0xfb, 0x80, 0xa1, 0x2d, 0xf0, 0x20, 0xf2, 0x7d, 0x74, 0x04, 0x8d, 0xc2, 0x29, 0xef, 0xa7, 0x50, + 0x61, 0x78, 0x66, 0x89, 0x49, 0x8c, 0x0a, 0x46, 0xa3, 0xf3, 0xda, 0x0d, 0x14, 0x4d, 0x3c, 0x3b, + 0x9d, 0xc4, 0x68, 0x96, 0x59, 0xf2, 0x83, 0x34, 0xa1, 0x24, 0x97, 0x52, 0x57, 0xe1, 0xc8, 0x9b, + 0x45, 0x86, 0x67, 0x3d, 0x37, 0x8b, 0x30, 0xbf, 0x8c, 0xf0, 0x3e, 0x54, 0x62, 0x16, 0x7d, 0x32, + 0x91, 0xcb, 0x0a, 0x6a, 0x59, 0x59, 0xd9, 0x3d, 0x97, 0xbc, 0x0d, 0x25, 0xee, 0x8c, 0x30, 0xb0, + 0x5b, 0x45, 0xa5, 0xc7, 0xfd, 0x6b, 0xf5, 0xd8, 0xf7, 0xa3, 0x81, 0x99, 0x4e, 0x34, 0x9e, 0x6b, + 0xd0, 0xec, 0xb2, 0x28, 0xbe, 0xd5, 0xbc, 0x8e, 0x60, 0xdd, 0x99, 0xe1, 0xb3, 0x42, 0x3b, 0xc0, + 0x94, 0xe0, 0x1b, 0x59, 0x44, 0x69, 0x8a, 0xb6, 0xe7, 0x64, 0x8e, 0xed, 0x00, 0xcd, 0x86, 0x93, + 0xb1, 0x8d, 0xdf, 0x34, 0xd8, 0x3a, 0xb4, 0xf9, 0x5d, 0xa2, 0xfc, 0x87, 0x06, 0xf7, 0xbb, 0xc8, + 0x1d, 0x46, 0x07, 0x78, 0x97, 0x78, 0x7f, 0xa5, 0x41, 0xb3, 0x3f, 0x8a, 0x2e, 0x6e, 0x33, 0x67, + 0xe3, 0x57, 0x0d, 0x5e, 0x4a, 0xaa, 0xcb, 0x89, 0xcd, 0x04, 0xbd, 0xa5, 0x91, 0x79, 0x0f, 0x1a, + 0xf1, 0x14, 0xde, 0x62, 0x60, 0x1e, 0x5d, 0x1f, 0x98, 0x19, 0x15, 0x15, 0x97, 0x7a, 0xbc, 0x68, + 0x1a, 0xbf, 0x68, 0xb0, 0x25, 0xab, 0xce, 0x5d, 0xe1, 0xfb, 0xb3, 0x06, 0x9b, 0x87, 0x36, 0xbf, + 0x2b, 0x74, 0x9f, 0x6b, 0xd0, 0x9a, 0x56, 0x9b, 0xbb, 0xc2, 0x59, 0x7e, 0x54, 0x64, 0xa5, 0xb9, + 0xcd, 0x7c, 0xff, 0xef, 0x8f, 0x4a, 0x0e, 0xea, 0xbd, 0x90, 0x23, 0x13, 0x2f, 0x8e, 0xeb, 0x9b, + 0x57, 0x21, 0x4b, 0xc6, 0xd5, 0x65, 0x30, 0xe4, 0x11, 0xcc, 0x03, 0x62, 0x09, 0x7b, 0xa8, 0xb8, + 0x57, 0xcd, 0xb5, 0x99, 0xf3, 0xd4, 0x1e, 0x92, 0x57, 0x01, 0x38, 0x0e, 0x03, 0x0c, 0x85, 0x3c, + 0xa8, 0xa8, 0x0e, 0xaa, 0xa6, 0x9e, 0x9e, 0x2b, 0x87, 0x9d, 0x91, 0x1d, 0x86, 0xe8, 0xcb, 0xe1, + 0x52, 0x32, 0x9c, 0x7a, 0x7a, 0x6e, 0x46, 0xd9, 0x72, 0x56, 0xd9, 0x4c, 0x48, 0x2a, 0xcb, 0x21, + 0xd9, 0x86, 0x32, 0x8b, 0x2e, 0x2c, 0xea, 0xf2, 0x56, 0x75, 0x27, 0x2f, 0x9f, 0xc6, 0x2c, 0xba, + 0xe8, 0xb9, 0x9c, 0xbc, 0x03, 0x15, 0x39, 0xe0, 0xda, 0xc2, 0x6e, 0xc1, 0x4e, 0x7e, 0xf5, 0x93, + 0x4d, 0xee, 0xd1, 0xb5, 0x85, 0x6d, 0x7c, 0x9a, 0x83, 0x7a, 0x17, 0x7d, 0x14, 0x78, 0x0b, 0x74, + 0xcf, 0x6a, 0x56, 0x58, 0xa5, 0x59, 0x71, 0x85, 0x66, 0xa5, 0x65, 0xcd, 0x5e, 0x87, 0xb5, 0x98, + 0xd1, 0xc0, 0x66, 0x13, 0xcb, 0xc3, 0x09, 0x6f, 0x95, 0x95, 0x70, 0xb5, 0xd4, 0xf7, 0x0c, 0x27, + 0xdc, 0xf8, 0x53, 0x83, 0x7a, 0x1f, 0x6d, 0xe6, 0x8c, 0x5e, 0x9c, 0x0e, 0x8b, 0xf8, 0xf3, 0x2b, + 0xf0, 0x17, 0x96, 0xf1, 0x3f, 0x81, 0x0d, 0x86, 0x7c, 0xec, 0x0b, 0x6b, 0x41, 0x9e, 0x44, 0x81, + 0xf5, 0x64, 0xe0, 0x60, 0x26, 0xd2, 0x2e, 0x14, 0xcf, 0xc6, 0xc8, 0x26, 0x4a, 0x85, 0x95, 0x39, + 0x90, 0xcc, 0x33, 0xbe, 0xc8, 0xc1, 0xda, 0x94, 0xb9, 0xdc, 0xea, 0xbf, 0x75, 0x42, 0xff, 0x9e, + 0xb2, 0x01, 0x75, 0x05, 0xc0, 0x0a, 0x23, 0x17, 0xe7, 0xf1, 0xae, 0x29, 0xe7, 0x71, 0xe4, 0xe2, + 0xb2, 0x2c, 0xc5, 0x7f, 0x24, 0x4b, 0xe9, 0x7a, 0x59, 0xda, 0x50, 0x18, 0x51, 0x91, 0x84, 0xbe, + 0xd6, 0x79, 0x70, 0x7d, 0x8d, 0x3a, 0xa4, 0x82, 0x9b, 0x6a, 0x9e, 0xd1, 0x85, 0xda, 0x29, 0x0d, + 0xb0, 0x3f, 0x09, 0x9d, 0x23, 0x3e, 0xbc, 0xb9, 0x21, 0x5d, 0xd9, 0x01, 0x1a, 0x5f, 0x6a, 0x50, + 0x7e, 0x86, 0x93, 0x4e, 0x1f, 0x87, 0x4a, 0x21, 0x75, 0x71, 0xd3, 0x1d, 0x8a, 0xea, 0xde, 0x92, + 0x87, 0x50, 0x5b, 0xc8, 0xcd, 0x54, 0x3d, 0x98, 0xa7, 0xe6, 0xdf, 0x57, 0x68, 0xca, 0xad, 0x73, + 0xdb, 0x4f, 0x05, 0xac, 0x98, 0x65, 0xca, 0xdf, 0x97, 0xa6, 0xdc, 0x79, 0x5e, 0xa0, 0x78, 0xab, + 0xa8, 0x92, 0x1e, 0x66, 0x15, 0x8a, 0x1b, 0x1f, 0x01, 0xa4, 0xe0, 0x24, 0xc5, 0x79, 0x04, 0xb5, + 0xc5, 0x08, 0xbe, 0x0b, 0x65, 0x0f, 0x27, 0x1d, 0x8e, 0xc3, 0x56, 0x4e, 0x69, 0x77, 0xd3, 0x2d, + 0x48, 0xb7, 0x32, 0xa7, 0xd3, 0x8d, 0x10, 0x36, 0xfa, 0xc9, 0x61, 0x32, 0x57, 0x28, 0x17, 0xd4, + 0xe1, 0x4b, 0x55, 0x53, 0x5b, 0xae, 0x9a, 0x0f, 0xa1, 0x16, 0x60, 0x10, 0xb1, 0x89, 0xc5, 0xe9, + 0x25, 0x4e, 0xd5, 0x48, 0x5c, 0x7d, 0x7a, 0x89, 0x92, 0x6f, 0x38, 0x0e, 0x2c, 0x16, 0x5d, 0xf0, + 0x69, 0x42, 0x85, 0xe3, 0xc0, 0x8c, 0x2e, 0xf8, 0x93, 0xaf, 0x73, 0x50, 0x4e, 0xaf, 0x22, 0xa9, + 0x42, 0xd1, 0x3b, 0x8e, 0x42, 0xd4, 0xef, 0x91, 0x26, 0x6c, 0x78, 0xcb, 0xfd, 0xb6, 0xee, 0x92, + 0x4d, 0x58, 0xf7, 0xb2, 0xcd, 0xaa, 0x8e, 0x84, 0x40, 0xc3, 0xcb, 0x74, 0x73, 0xfa, 0xc7, 0x64, + 0x1b, 0x36, 0xbd, 0xab, 0xed, 0x8e, 0x3e, 0x24, 0x5b, 0xa0, 0x7b, 0xd9, 0x7e, 0x80, 0xeb, 0x23, + 0xd2, 0x04, 0xdd, 0x5b, 0x7a, 0x80, 0xeb, 0xdf, 0x6a, 0x64, 0x13, 0x1a, 0x5e, 0xe6, 0x95, 0xaa, + 0x7f, 0xa7, 0x11, 0x02, 0x75, 0x6f, 0xf1, 0x29, 0xa7, 0x7f, 0xaf, 0x91, 0x6d, 0x20, 0xde, 0x95, + 0xf7, 0x8e, 0xfe, 0x83, 0x46, 0xb6, 0x60, 0xdd, 0xcb, 0x3c, 0x0a, 0xb8, 0xfe, 0xa3, 0x46, 0xd6, + 0xa0, 0xec, 0x25, 0xdf, 0x4d, 0xfd, 0xb3, 0xbc, 0xb2, 0x92, 0x6a, 0xae, 0x7f, 0x9e, 0x58, 0xc9, + 0xcd, 0xd6, 0x7f, 0xcf, 0x93, 0x06, 0x54, 0x3d, 0x99, 0xd2, 0xa7, 0xd4, 0xf1, 0xf4, 0x6f, 0xaa, + 0x4f, 0x9e, 0x42, 0x65, 0xfa, 0x3f, 0x09, 0x01, 0x28, 0x1d, 0xd9, 0x5c, 0x20, 0xd3, 0xef, 0xc9, + 0xdf, 0x26, 0xda, 0x2e, 0x32, 0x5d, 0x93, 0xbf, 0x3f, 0x60, 0x54, 0xfa, 0x73, 0x52, 0xd2, 0x13, + 0x79, 0x75, 0xf5, 0xfc, 0x7e, 0xf7, 0xc3, 0xfd, 0x21, 0x15, 0xa3, 0xf1, 0x40, 0x16, 0x83, 0xdd, + 0x4b, 0xea, 0xfb, 0xf4, 0x52, 0xa0, 0x33, 0xda, 0x4d, 0x32, 0xe3, 0x2d, 0x97, 0x72, 0xc1, 0xe8, + 0x60, 0x2c, 0xd0, 0xdd, 0x9d, 0xe6, 0xc7, 0xae, 0x4a, 0x97, 0x99, 0x19, 0x0f, 0x06, 0x25, 0xe5, + 0xd9, 0xfb, 0x2b, 0x00, 0x00, 0xff, 0xff, 0xed, 0x44, 0xad, 0xe6, 0x47, 0x12, 0x00, 0x00, } diff --git a/internal/proxy/manipulation_req.go b/internal/proxy/manipulation_req.go index b2f9c84e26..d4a07bb10e 100644 --- a/internal/proxy/manipulation_req.go +++ b/internal/proxy/manipulation_req.go @@ -7,6 +7,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" pb "github.com/zilliztech/milvus-distributed/internal/proto/message" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "log" "sync" ) @@ -19,13 +20,13 @@ type manipulationReq struct { } // TsMsg interfaces -func (req *manipulationReq) Ts() (Timestamp, error) { +func (req *manipulationReq) Ts() (typeutil.Timestamp, error) { if req.msgs == nil { return 0, errors.New("No typed manipulation request message in ") } - return Timestamp(req.msgs[0].Timestamp), nil + return typeutil.Timestamp(req.msgs[0].Timestamp), nil } -func (req *manipulationReq) SetTs(ts Timestamp) { +func (req *manipulationReq) SetTs(ts typeutil.Timestamp) { for _, msg := range req.msgs { msg.Timestamp = uint64(ts) } @@ -111,10 +112,10 @@ func (s *proxyServer) restartManipulationRoutine(bufSize int) error { pulsarClient.Close() return case ip := <-s.reqSch.manipulationsChan: - ts, st := s.getTimestamp(1) - if st.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Printf("get time stamp failed, error code = %d, msg = %s", st.ErrorCode, st.Reason) - ip.stats[0] = st + ts, err := s.getTimestamp(1) + if err != nil { + log.Printf("get time stamp failed") + ip.stats[0] = commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR} ip.wg.Done() break } diff --git a/internal/proxy/proxy_node.go b/internal/proxy/proxy_node.go index 9d2edc900d..7ee5914440 100644 --- a/internal/proxy/proxy_node.go +++ b/internal/proxy/proxy_node.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "github.com/apache/pulsar-client-go/pulsar" + "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" etcd "go.etcd.io/etcd/clientv3" "strconv" ) @@ -46,7 +48,7 @@ type ProxyOptions struct { // inner member proxyServer *proxyServer - tso *timestampOracle + tso *allocator.TimestampAllocator timeTick *timeTick ctx context.Context cancel context.CancelFunc @@ -87,17 +89,18 @@ func StartProxy(opt *ProxyOptions) error { opt.ctx, opt.cancel = context.WithCancel(context.Background()) ///////////////////// timestamporacle ////////////////////////// - etcdTso, err := etcd.New(etcd.Config{Endpoints: opt.etcdEndpoints}) - if err != nil { - return err - } - tso := ×tampOracle{ - client: etcdTso, - ctx: opt.ctx, - rootPath: opt.tsoRootPath, - saveInterval: opt.tsoSaveInterval, - } - tso.Restart(opt.proxyId) + //etcdTso, err := etcd.New(etcd.Config{Endpoints: opt.etcdEndpoints}) + //if err != nil { + // return err + //} + //tso := ×tampOracle{ + // client: etcdTso, + // ctx: opt.ctx, + // rootPath: opt.tsoRootPath, + // saveInterval: opt.tsoSaveInterval, + //} + //tso.Restart(opt.proxyId) + tso := allocator.NewTimestampAllocator() /////////////////// proxy server /////////////////////////////// //readerTopics, send insert and delete message into these topics @@ -122,7 +125,7 @@ func StartProxy(opt *ProxyOptions) error { resultGroup: opt.resultTopic, numReaderNode: opt.numReaderNode, proxyId: opt.proxyId, - getTimestamp: tso.GetTimestamp, + getTimestamp: tso.Alloc, client: etcdProxy, ctx: opt.ctx, } @@ -147,15 +150,15 @@ func StartProxy(opt *ProxyOptions) error { pulsarProducer: ttProducer, peer_id: opt.timeTickPeerId, ctx: opt.ctx, - areRequestsDelivered: func(ts Timestamp) bool { return srv.reqSch.AreRequestsDelivered(ts, 2) }, - getTimestamp: func() (Timestamp, commonpb.Status) { - ts, st := tso.GetTimestamp(1) - return ts[0], st + areRequestsDelivered: func(ts typeutil.Timestamp) bool { return srv.reqSch.AreRequestsDelivered(ts, 2) }, + getTimestamp: func() (typeutil.Timestamp, error) { + ts, st := tso.AllocOne() + return ts, st }, } - s := tt.Restart() - if s.ErrorCode != commonpb.ErrorCode_SUCCESS { - return fmt.Errorf(s.Reason) + err = tt.Restart() + if err != nil { + return fmt.Errorf("timeTick Restart Failed") } opt.proxyServer = srv diff --git a/internal/proxy/proxy_node_test.go b/internal/proxy/proxy_node_test.go index 0c447c1377..3b7c110399 100644 --- a/internal/proxy/proxy_node_test.go +++ b/internal/proxy/proxy_node_test.go @@ -6,9 +6,11 @@ import ( "encoding/json" "github.com/apache/pulsar-client-go/pulsar" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/allocator" pb "github.com/zilliztech/milvus-distributed/internal/proto/message" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" etcd "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" "sort" @@ -17,6 +19,13 @@ import ( "time" ) + +const ( + tsoKeyPath string = "/timestampOracle" +) + +var timeAllocator *allocator.TimestampAllocator = allocator.NewTimestampAllocator() + func TestProxyNode(t *testing.T) { startTestMaster("localhost:11000", t) testOpt := ProxyOptions{ @@ -65,12 +74,13 @@ func TestProxyNode(t *testing.T) { value, err := strconv.ParseUint(string(ts.Kvs[0].Value), 10, 64) assert.Nil(t, err) - curValue, st := testOpt.tso.GetTimestamp(1) - assert.Equalf(t, st.ErrorCode, pb.ErrorCode_SUCCESS, "%s", st.Reason) + curValue, err := testOpt.tso.AllocOne() + curTS, err := timeAllocator.AllocOne() + assert.Equalf(t, err, nil, "%s", "allocator failed") - curTime := ToPhysicalTime(uint64(curValue[0])) + curTime, _:= tsoutil.ParseTS(curTS) t.Logf("current time stamp = %d, saved time stamp = %d", curTime, value) - assert.GreaterOrEqual(t, uint64(curValue[0]), value) + assert.GreaterOrEqual(t, curValue, value) assert.GreaterOrEqual(t, value, startTime) time.Sleep(time.Duration(testOpt.tsoSaveInterval) * time.Millisecond) } @@ -144,7 +154,7 @@ func TestProxyNode(t *testing.T) { } }() go func() { - lastT := startTime + lastT, _ := tsoutil.ParseTS(startTime) for { cm, ok := <-tickComsumer.Chan() assert.Truef(t, ok, "time tick consumer topic has closed") @@ -153,7 +163,7 @@ func TestProxyNode(t *testing.T) { if err := proto.Unmarshal(cm.Payload(), &tsm); err != nil { t.Fatal(err) } - curT := ToPhysicalTime(tsm.Timestamp) + curT, _:= tsoutil.ParseTS(tsm.Timestamp) t.Logf("time tick = %d", curT) assert.Greater(t, curT, lastT) lastT = curT @@ -240,8 +250,9 @@ func TestProxyNode(t *testing.T) { assert.Equal(t, qm.ProxyId, testOpt.proxyId) assert.Equal(t, qm.CollectionName, "cm100") - t.Logf("query time stamp = %d", ToPhysicalTime(qm.Timestamp)) - assert.Greater(t, ToPhysicalTime(qm.Timestamp), startTime) + physicalTime, _ := tsoutil.ParseTS(qm.Timestamp) + t.Logf("query time stamp = %d", physicalTime) + assert.Greater(t,physicalTime, startTime) r1 := pb.QueryResult{ Status: &pb.Status{ErrorCode: pb.ErrorCode_SUCCESS}, @@ -319,8 +330,9 @@ func TestProxyNode(t *testing.T) { assert.Equal(t, m1.CollectionName, "cm100") assert.Equal(t, len(m1.PrimaryKeys), len(m1.RowsData)) - t.Logf("reader time stamp = %d", ToPhysicalTime(m1.Timestamp)) - assert.GreaterOrEqual(t, ToPhysicalTime(m1.Timestamp), startTime) + physicalTime, _ := tsoutil.ParseTS(m1.Timestamp) + t.Logf("reader time stamp = %d", physicalTime) + assert.GreaterOrEqual(t, physicalTime, startTime) for i, k := range m1.PrimaryKeys { insertPrimaryKey = append(insertPrimaryKey, k) @@ -340,8 +352,10 @@ func TestProxyNode(t *testing.T) { assert.Equal(t, m2.CollectionName, "cm100") assert.Equal(t, len(m2.PrimaryKeys), len(m2.RowsData)) - t.Logf("read time stamp = %d", ToPhysicalTime(m2.Timestamp)) - assert.GreaterOrEqual(t, ToPhysicalTime(m2.Timestamp), startTime) + physicalTime, _ = tsoutil.ParseTS(m2.Timestamp) + t.Logf("reader time stamp = %d", physicalTime) + t.Logf("read time stamp = %d", physicalTime) + assert.GreaterOrEqual(t, physicalTime, startTime) for i, k := range m2.PrimaryKeys { insertPrimaryKey = append(insertPrimaryKey, k) @@ -373,8 +387,10 @@ func TestProxyNode(t *testing.T) { assert.Equal(t, dm.CollectionName, "cm100") assert.Equal(t, len(dm.PrimaryKeys), 2) - t.Logf("delete time stamp = %d", ToPhysicalTime(dm.Timestamp)) - assert.GreaterOrEqual(t, ToPhysicalTime(dm.Timestamp), startTime) + physicalTime, _ = tsoutil.ParseTS(m1.Timestamp) + t.Logf("reader time stamp = %d", physicalTime) + t.Logf("delete time stamp = %d", physicalTime) + assert.GreaterOrEqual(t, physicalTime, startTime) for i := 0; i < len(dm.PrimaryKeys); i++ { assert.Equal(t, dm.PrimaryKeys[i], uint64(i+20)) diff --git a/internal/proxy/query_req.go b/internal/proxy/query_req.go index c117b8a806..b47c2e54fb 100644 --- a/internal/proxy/query_req.go +++ b/internal/proxy/query_req.go @@ -73,9 +73,9 @@ func (s *proxyServer) restartQueryRoutine(buf_size int) error { case <-s.ctx.Done(): return case qm := <-s.reqSch.queryChan: - ts, st := s.getTimestamp(1) - if st.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Printf("get time stamp failed, error code = %d, msg = %s", st.ErrorCode, st.Reason) + ts, err := s.getTimestamp(1) + if err != nil { + log.Printf("get time stamp failed") break } qm.Timestamp = uint64(ts[0]) diff --git a/internal/proxy/request_scheduler.go b/internal/proxy/request_scheduler.go index 88604b7725..e23d2920fb 100644 --- a/internal/proxy/request_scheduler.go +++ b/internal/proxy/request_scheduler.go @@ -1,18 +1,21 @@ package proxy -import "sync" +import ( + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "sync" +) type requestScheduler struct { //definitions requestQueue //manipulations requestQueue manipulationsChan chan *manipulationReq // manipulation queue - mTimestamp Timestamp + mTimestamp typeutil.Timestamp mTimestampMux sync.Mutex //queries requestQueue queryChan chan *queryReq - qTimestamp Timestamp + qTimestamp typeutil.Timestamp qTimestampMux sync.Mutex } @@ -21,7 +24,7 @@ type requestScheduler struct { // bit_1 = 1: select manipulation queue // bit_2 = 1: select query queue // example: if mode = 3, then both definition and manipulation queues are selected -func (rs *requestScheduler) AreRequestsDelivered(ts Timestamp, selection uint32) bool { +func (rs *requestScheduler) AreRequestsDelivered(ts typeutil.Timestamp, selection uint32) bool { r1 := func() bool { if selection&uint32(2) == 0 { return true diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 6354696bbb..af3abbb236 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -44,7 +44,7 @@ type proxyServer struct { resultGroup string numReaderNode int proxyId int64 - getTimestamp func(count uint32) ([]Timestamp, commonpb.Status) + getTimestamp func(count uint32) ([]typeutil.Timestamp, error) client *etcd.Client ctx context.Context wg sync.WaitGroup diff --git a/internal/proxy/server_test.go b/internal/proxy/server_test.go index aae2cef1b8..b85fd58e2b 100644 --- a/internal/proxy/server_test.go +++ b/internal/proxy/server_test.go @@ -5,12 +5,12 @@ import ( "encoding/binary" "encoding/json" "github.com/apache/pulsar-client-go/pulsar" - mpb "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" - pb "github.com/zilliztech/milvus-distributed/internal/proto/message" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + mpb "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + pb "github.com/zilliztech/milvus-distributed/internal/proto/message" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" "net" @@ -63,13 +63,13 @@ func startTestProxyServer(proxy_addr string, master_addr string, t *testing.T) * resultGroup: "reusltG", numReaderNode: 2, proxyId: 1, - getTimestamp: func(count uint32) ([]Timestamp, commonpb.Status) { + getTimestamp: func(count uint32) ([]typeutil.Timestamp, error) { timestamp += 100 - t := make([]Timestamp, count) + t := make([]typeutil.Timestamp, count) for i := 0; i < int(count); i++ { - t[i] = Timestamp(timestamp) + t[i] = typeutil.Timestamp(timestamp) } - return t, commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} + return t, nil }, client: client, ctx: ctx, @@ -371,7 +371,7 @@ func TestProxyServer_InsertAndDelete(t *testing.T) { assert.Equalf(t, primaryKey[i], uint64(i+1), "insert failed") } t.Logf("m_timestamp = %d", ps.reqSch.mTimestamp) - assert.Equalf(t, ps.reqSch.mTimestamp, Timestamp(1300), "insert failed") + assert.Equalf(t, ps.reqSch.mTimestamp, typeutil.Timestamp(1300), "insert failed") } func TestProxyServer_Search(t *testing.T) { diff --git a/internal/proxy/timestamporacle.go b/internal/proxy/timestamporacle.go deleted file mode 100644 index b3b9ab8890..0000000000 --- a/internal/proxy/timestamporacle.go +++ /dev/null @@ -1,120 +0,0 @@ -package proxy - -import ( - "context" - "fmt" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - etcd "go.etcd.io/etcd/clientv3" - "log" - "strconv" - "sync" - "time" -) - -const ( - tsoKeyPath string = "/timestampOracle" -) - -type timestamp struct { - physical uint64 // 18-63 bits - logical uint64 // 8-17 bits - id uint64 // 0-7 bits -} - -type Timestamp uint64 - -type timestampOracle struct { - client *etcd.Client // client of a reliable meta service, i.e. etcd client - ctx context.Context - rootPath string // this timestampOracle's working root path on the reliable kv service - saveInterval uint64 - lastSavedTime uint64 - tso timestamp // monotonically increasing m_timestamp - mux sync.Mutex -} - -func ToTimeStamp(t *timestamp) Timestamp { - ts := (t.physical << 18) + (t.logical << 8) + (t.id & uint64(0xFF)) - return Timestamp(ts) -} - -func ToPhysicalTime(t uint64) uint64 { - return t >> 18 -} - -func (tso *timestampOracle) Restart(id int64) { - go func() { - tso.loadTimestamp() - tso.tso.id = uint64(id) - ticker := time.Tick(time.Duration(tso.saveInterval) * time.Millisecond) - for { - select { - case <-ticker: - _, s := tso.GetTimestamp(1) - if s.ErrorCode == commonpb.ErrorCode_SUCCESS { - _ = tso.saveTimestamp() - } - break - case <-tso.ctx.Done(): - if err := tso.client.Close(); err != nil { - log.Printf("close etcd client error %v", err) - } - return - } - } - }() -} - -func (tso *timestampOracle) GetTimestamp(count uint32) ([]Timestamp, commonpb.Status) { - physical := uint64(time.Now().UnixNano()) / uint64(1e6) - var ctso timestamp - tso.mux.Lock() - if tso.tso.physical < physical { - tso.tso.physical = physical - } - ctso = tso.tso - tso.mux.Unlock() - tt := make([]Timestamp, 0, count) - // (TODO:shengjh) seems tso.tso has not been updated. - for i := uint32(0); i < count; i++ { - ctso.logical = uint64(i) - tt = append(tt, ToTimeStamp(&ctso)) - } - return tt, commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} -} - -func (tso *timestampOracle) saveTimestamp() commonpb.Status { - tso.mux.Lock() - physical := tso.tso.physical - tso.mux.Unlock() - if _, err := tso.client.Put(tso.ctx, tso.rootPath+tsoKeyPath, strconv.FormatUint(physical, 10)); err != nil { - return commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: fmt.Sprintf("put into etcd failed, error = %v", err)} - } - tso.mux.Lock() - tso.lastSavedTime = physical - tso.mux.Unlock() - return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} -} - -func (tso *timestampOracle) loadTimestamp() commonpb.Status { - ts, err := tso.client.Get(tso.ctx, tso.rootPath+tsoKeyPath) - if err != nil { - return commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: fmt.Sprintf("get from etcd failed, error = %v", err)} - } - if len(ts.Kvs) != 0 { - n, err := strconv.ParseUint(string(ts.Kvs[0].Value), 10, 64) - if err != nil { - return commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: fmt.Sprintf("ParseUint failed, error = %v", err)} - } - tso.mux.Lock() - tso.tso.physical = n - tso.lastSavedTime = n - tso.mux.Unlock() - } else { - tso.mux.Lock() - tso.tso.physical = uint64(time.Now().UnixNano()) / uint64(1e6) - tso.lastSavedTime = tso.tso.physical - tso.mux.Unlock() - } - return commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR} -} diff --git a/internal/proxy/timestamporacle_test.go b/internal/proxy/timestamporacle_test.go deleted file mode 100644 index 1c329c8cf3..0000000000 --- a/internal/proxy/timestamporacle_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package proxy - -import ( - "context" - "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/clientv3" - "testing" - "time" -) - -func TestTimestampOracle(t *testing.T) { - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}}) - assert.Nil(t, err) - - defer cli.Close() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - tso := timestampOracle{ - client: cli, - ctx: ctx, - rootPath: "/proxy/tso", - saveInterval: 200, - } - tso.Restart(0) - time.Sleep(time.Second) - tso.loadTimestamp() - tso.mux.Lock() - assert.GreaterOrEqualf(t, tso.tso.physical, uint64(100), "physical error") - - t.Log("physical = ", tso.tso.physical) - tso.mux.Unlock() - ts, _ := tso.GetTimestamp(1) - t.Log("Timestamp = ", ts[0]) -} diff --git a/internal/proxy/timetick.go b/internal/proxy/timetick.go index c77023b862..660ba4f636 100644 --- a/internal/proxy/timetick.go +++ b/internal/proxy/timetick.go @@ -2,36 +2,36 @@ package proxy import ( "context" - "fmt" "github.com/apache/pulsar-client-go/pulsar" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/errors" pb "github.com/zilliztech/milvus-distributed/internal/proto/message" "github.com/golang/protobuf/proto" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "log" "time" ) type timeTick struct { - lastTick Timestamp - currentTick Timestamp + lastTick typeutil.Timestamp + currentTick typeutil.Timestamp interval uint64 pulsarProducer pulsar.Producer peer_id int64 ctx context.Context - areRequestsDelivered func(ts Timestamp) bool - getTimestamp func() (Timestamp, commonpb.Status) + areRequestsDelivered func(ts typeutil.Timestamp) bool + getTimestamp func() (typeutil.Timestamp, error) } -func (tt *timeTick) tick() commonpb.Status { +func (tt *timeTick) tick() error { if tt.lastTick == tt.currentTick { - ts, s := tt.getTimestamp() - if s.ErrorCode != commonpb.ErrorCode_SUCCESS { - return s + ts, err := tt.getTimestamp() + if err != nil { + return err } tt.currentTick = ts } if tt.areRequestsDelivered(tt.currentTick) == false { - return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} + return errors.New("Failed") } tsm := pb.TimeSyncMsg{ Timestamp: uint64(tt.currentTick), @@ -40,21 +40,22 @@ func (tt *timeTick) tick() commonpb.Status { } payload, err := proto.Marshal(&tsm) if err != nil { - return commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: fmt.Sprintf("marshal TimeSync failed, error = %v", err)} + return err } if _, err := tt.pulsarProducer.Send(tt.ctx, &pulsar.ProducerMessage{Payload: payload}); err != nil { - return commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: fmt.Sprintf("send into pulsar failed, error = %v", err)} + return err } tt.lastTick = tt.currentTick - return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} + return nil } -func (tt *timeTick) Restart() commonpb.Status { +func (tt *timeTick) Restart() error{ tt.lastTick = 0 - ts, s := tt.getTimestamp() - if s.ErrorCode != commonpb.ErrorCode_SUCCESS { - return s + ts, err := tt.getTimestamp() + if err != nil{ + return err } + tt.currentTick = ts tick := time.Tick(time.Millisecond * time.Duration(tt.interval)) @@ -62,8 +63,8 @@ func (tt *timeTick) Restart() commonpb.Status { for { select { case <-tick: - if s := tt.tick(); s.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Printf("timeTick error ,status = %d", int(s.ErrorCode)) + if err := tt.tick(); err != nil { + log.Printf("timeTick error") } case <-tt.ctx.Done(): tt.pulsarProducer.Close() @@ -71,5 +72,5 @@ func (tt *timeTick) Restart() commonpb.Status { } } }() - return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} + return nil } diff --git a/internal/proxy/timetick_test.go b/internal/proxy/timetick_test.go index 9e4a9f8b6b..a32b2b21df 100644 --- a/internal/proxy/timetick_test.go +++ b/internal/proxy/timetick_test.go @@ -3,10 +3,10 @@ package proxy import ( "context" "github.com/apache/pulsar-client-go/pulsar" - pb "github.com/zilliztech/milvus-distributed/internal/proto/message" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + pb "github.com/zilliztech/milvus-distributed/internal/proto/message" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "testing" "time" ) @@ -28,17 +28,17 @@ func TestTimeTick(t *testing.T) { ctx, _ := context.WithTimeout(context.Background(), 4*time.Second) - var curTs Timestamp + var curTs typeutil.Timestamp curTs = 0 tt := timeTick{ interval: 200, pulsarProducer: producer, peer_id: 1, ctx: ctx, - areRequestsDelivered: func(ts Timestamp) bool { return true }, - getTimestamp: func() (Timestamp, commonpb.Status) { + areRequestsDelivered: func(ts typeutil.Timestamp) bool { return true }, + getTimestamp: func() (typeutil.Timestamp, error) { curTs = curTs + 100 - return curTs, commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} + return curTs, nil }, } tt.Restart() diff --git a/internal/proxy/tso.go b/internal/proxy/tso.go deleted file mode 100644 index 46a608f139..0000000000 --- a/internal/proxy/tso.go +++ /dev/null @@ -1,125 +0,0 @@ -package proxy - -import ( - "context" - "fmt" - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/proxy/mock" - "log" - "sync" - "time" -) - -// tsCountPerRPC is the count of timestamp requested from master per RPC -const tsCountPerRPC = 2 << 18 * 10 - -// defaultUpdateInterval is the interval between requesting a batch timestamps from master -const defaultUpdateInterval = time.Millisecond * 1000 - -// Oracle is the interface that provides strictly ascending timestamps. -type Oracle interface { - GetTimestamp(ctx context.Context, count uint32) (uint64, error) - Close() -} - -type tsWithTTL struct { - ts uint64 - count uint64 - expireTime time.Time -} - -func (ts *tsWithTTL) IsExpired() bool { - now := time.Now() - return now.Sub(ts.expireTime) >= 0 -} - -func (ts *tsWithTTL) CanAllocTs(count uint32) bool { - return !ts.IsExpired() && ts.count >= uint64(count) -} - -// MatserOracle implement Oracle interface, proving strictly ascending timestamps. -// It request and cache a batch timestamps from master, and updating periodically. -type MatserOracle struct { - c *mock.TSOClient - lastTs *tsWithTTL - quit chan struct{} - mux sync.RWMutex -} - -func NewMasterTSO(client *mock.TSOClient) (Oracle, error) { - o := &MatserOracle{ - c: client, - lastTs: &tsWithTTL{ - ts: 0, - count: 0, - expireTime: time.Time{}, - }, - quit: make(chan struct{}), - } - go o.UpdateLastTs(defaultUpdateInterval) - return o, nil -} - -func (o *MatserOracle) UpdateLastTs(updateInterval time.Duration) { - tick := time.NewTicker(updateInterval) - defer tick.Stop() - for { - select { - case <-tick.C: - // Update ts - ctx := context.TODO() - ts, count, tw, err := o.c.GetTimeStamp(ctx, tsCountPerRPC) - if err != nil { - break - } else { - o.SetTs(ts, count, tw) - } - case <-o.quit: - return - } - } -} - -func (o *MatserOracle) SetTs(ts uint64, count uint64, timeWindow time.Duration) { - o.mux.Lock() - defer o.mux.Unlock() - if ts > o.lastTs.ts || o.lastTs.ts == 0 { - o.lastTs.ts = ts - o.lastTs.count = count - o.lastTs.expireTime = time.Now().Add(timeWindow) - } -} - -func (o *MatserOracle) GetTimestamp(ctx context.Context, count uint32) (uint64, error) { - // TODO: add context deadline - if count > tsCountPerRPC { - return 0, errors.New("Can't alloc too large count timestamps, count must less than " + fmt.Sprintf("%v", tsCountPerRPC)) - } - maxRetry := 10 - for i := 0; i < maxRetry; i++ { - o.mux.RLock() - retry := !o.lastTs.CanAllocTs(count) - o.mux.RUnlock() - if retry { - // wait for timestamp updated - log.Printf("MasterOracle GetTimeStamp, retry count: %v", i+1) - time.Sleep(time.Millisecond * 100) - continue - } - break - } - o.mux.Lock() - defer o.mux.Unlock() - // TimeStamp has not been update while retry `maxRetry` times - if !o.lastTs.CanAllocTs(count) { - return 0, errors.New("MasterOracle GetTimeStamp failed, exceeds max retry times") - } - ts := o.lastTs.ts - o.lastTs.ts += uint64(count) - o.lastTs.count -= uint64(count) - return ts, nil -} - -func (o *MatserOracle) Close() { - close(o.quit) -} diff --git a/internal/proxy/tso_test.go b/internal/proxy/tso_test.go deleted file mode 100644 index 80d4cf0dc3..0000000000 --- a/internal/proxy/tso_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package proxy - -import ( - "context" - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/proxy/mock" - "testing" - "time" -) - -func TestMatserOracle_GetTimestamp(t *testing.T) { - tso, _:= NewMasterTSO(&mock.TSOClient{}) - defer tso.Close() - - ctx := context.TODO() - ts0, err := tso.GetTimestamp(ctx, 100) - assert.Nil(t, err) - ts1, err := tso.GetTimestamp(ctx, 100) - t.Logf("ts0=%v, ts1=%v", ts0, ts1) - assert.Nil(t, err) - assert.Greater(t, ts1, ts0) - assert.Greater(t, ts1, ts0 + 99) - - time.Sleep(time.Second * 3) - ts0, err = tso.GetTimestamp(ctx, 100) - assert.Nil(t, err) - ts1, err = tso.GetTimestamp(ctx, 100) - t.Logf("ts0=%v, ts1=%v", ts0, ts1) - assert.Nil(t, err) - assert.Greater(t, ts1, ts0) - assert.Greater(t, ts1, ts0 + 99) - - _, err = tso.GetTimestamp(ctx, 2<<30) - assert.NotNil(t, err) - t.Log(err) -} diff --git a/internal/reader/index_test.go b/internal/reader/index_test.go index 8632f99954..dcb798b938 100644 --- a/internal/reader/index_test.go +++ b/internal/reader/index_test.go @@ -54,7 +54,7 @@ func TestIndex_BuildIndex(t *testing.T) { assert.NoError(t, err) // 6. Build index - segment.BuildIndex(collection) + //segment.BuildIndex(collection) //assert.Equal(t, status.ErrorCode, msgPb.ErrorCode_SUCCESS) // 7. Do search @@ -122,7 +122,7 @@ func TestIndex_DropIndex(t *testing.T) { assert.NoError(t, err) // 6. Build index - var status = segment.BuildIndex(collection) + //var status = segment.BuildIndex(collection) //assert.Equal(t, status.ErrorCode, msgPb.ErrorCode_SUCCESS) // 7. Do search @@ -141,8 +141,8 @@ func TestIndex_DropIndex(t *testing.T) { fmt.Println(searchRes) // 8. Drop index - status = segment.DropIndex("fakevec") - assert.Equal(t, status.ErrorCode, msgPb.ErrorCode_SUCCESS) + //status = segment.DropIndex("fakevec") + //assert.Equal(t, status.ErrorCode, msgPb.ErrorCode_SUCCESS) // 9. Destruct node, collection, and segment partition.DeleteSegment(node, segment) @@ -193,9 +193,9 @@ func TestIndex_UpdateIndex(t *testing.T) { var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) assert.NoError(t, err) - // 6. Build index - segment.BuildIndex(collection) - assert.NoError(t, err) + //// 6. Build index + //segment.BuildIndex(collection) + //assert.NoError(t, err) // 7. Do search var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" diff --git a/internal/reader/meta_test.go b/internal/reader/meta_test.go index b71ff4b036..1b5eaa5566 100644 --- a/internal/reader/meta_test.go +++ b/internal/reader/meta_test.go @@ -7,8 +7,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/master/collection" "github.com/zilliztech/milvus-distributed/internal/master/segment" "github.com/zilliztech/milvus-distributed/internal/msgclient" - "github.com/zilliztech/milvus-distributed/internal/proto/master" - messagePb "github.com/zilliztech/milvus-distributed/internal/proto/message" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "log" "math" "sync" @@ -76,15 +75,14 @@ func TestMeta_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T) { conf.LoadConfig("config.yaml") var s = segment.Segment{ - SegmentID: uint64(0), - CollectionID: uint64(0), + SegmentID: int64(0), + CollectionID: int64(0), PartitionTag: "partition0", ChannelStart: 0, ChannelEnd: 128, OpenTimeStamp: uint64(0), CloseTimeStamp: uint64(math.MaxUint64), CollectionName: "collection0", - Status: master.SegmentStatus_OPENED, Rows: int64(0), } @@ -92,15 +90,14 @@ func TestMeta_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T) { assert.Equal(t, b, true) s = segment.Segment{ - SegmentID: uint64(0), - CollectionID: uint64(0), + SegmentID: int64(0), + CollectionID: int64(0), PartitionTag: "partition0", ChannelStart: 128, ChannelEnd: 256, OpenTimeStamp: uint64(0), CloseTimeStamp: uint64(math.MaxUint64), CollectionName: "collection0", - Status: master.SegmentStatus_OPENED, Rows: int64(0), } @@ -111,31 +108,30 @@ func TestMeta_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T) { func TestMeta_PrintCollectionStruct(t *testing.T) { var age = collection.FieldMeta{ FieldName: "age", - Type: messagePb.DataType_INT32, + Type: schemapb.DataType_INT32, DIM: int64(1), } var vec = collection.FieldMeta{ FieldName: "vec", - Type: messagePb.DataType_VECTOR_FLOAT, + Type: schemapb.DataType_VECTOR_FLOAT, DIM: int64(16), } var fieldMetas = []collection.FieldMeta{age, vec} var c = collection.Collection{ - ID: uint64(0), + ID: int64(0), Name: "collection0", CreateTime: uint64(0), Schema: fieldMetas, - SegmentIDs: []uint64{ + SegmentIDs: []int64{ 0, 1, 2, }, PartitionTags: []string{ "partition0", }, GrpcMarshalString: "", - IndexParam: nil, } printCollectionStruct(&c) @@ -143,15 +139,14 @@ func TestMeta_PrintCollectionStruct(t *testing.T) { func TestMeta_PrintSegmentStruct(t *testing.T) { var s = segment.Segment{ - SegmentID: uint64(0), - CollectionID: uint64(0), + SegmentID: int64(0), + CollectionID: int64(0), PartitionTag: "partition0", ChannelStart: 128, ChannelEnd: 256, OpenTimeStamp: uint64(0), CloseTimeStamp: uint64(math.MaxUint64), CollectionName: "collection0", - Status: master.SegmentStatus_OPENED, Rows: int64(0), } @@ -200,7 +195,7 @@ func TestMeta_ProcessSegmentCreate(t *testing.T) { "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177663," + "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" - c := node.NewCollection(uint64(0), "test", "") + c := node.NewCollection(int64(0), "test", "") c.NewPartition("default") node.processSegmentCreate(id, value) @@ -266,7 +261,7 @@ func TestMeta_ProcessSegmentModify(t *testing.T) { "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177663," + "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" - var c = node.NewCollection(uint64(0), "test", "") + var c = node.NewCollection(int64(0), "test", "") c.NewPartition("default") node.processSegmentCreate(id, value) @@ -418,7 +413,7 @@ func TestMeta_ProcessSegmentDelete(t *testing.T) { "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177663," + "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" - c := node.NewCollection(uint64(0), "test", "") + c := node.NewCollection(int64(0), "test", "") c.NewPartition("default") node.processSegmentCreate(id, value) diff --git a/internal/reader/util_functions_test.go b/internal/reader/util_functions_test.go index 49862c0d79..17226061a1 100644 --- a/internal/reader/util_functions_test.go +++ b/internal/reader/util_functions_test.go @@ -67,7 +67,7 @@ func TestUtilFunctions_GetCollectionByID(t *testing.T) { assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) - c := node.GetCollectionByID(uint64(0)) + c := node.GetCollectionByID(int64(0)) assert.Equal(t, c.CollectionName, "collection0") partition.DeleteSegment(node, segment) diff --git a/internal/timesync/timesync_test.go b/internal/timesync/timesync_test.go index a300b2bb8e..fcb1fb4096 100644 --- a/internal/timesync/timesync_test.go +++ b/internal/timesync/timesync_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" pb "github.com/zilliztech/milvus-distributed/internal/proto/message" "github.com/golang/protobuf/proto" ) @@ -49,17 +50,17 @@ func TestAlignTimeSync(t *testing.T) { proxyIdList: []int64{1, 2, 3}, interval: 200, } - ts := []*pb.TimeSyncMsg{ + ts := []*internalpb.TimeSyncMsg{ { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(5), }, { - Peer_Id: 3, + PeerId: 3, Timestamp: toTimestamp(15), }, { - Peer_Id: 2, + PeerId: 2, Timestamp: toTimestamp(20), }, } @@ -68,7 +69,7 @@ func TestAlignTimeSync(t *testing.T) { t.Fatalf("proxyIdList should be : 1 2 3") } for i := 0; i < len(r.proxyIdList); i++ { - if r.proxyIdList[i] != ts[i].Peer_Id { + if r.proxyIdList[i] != ts[i].PeerId { t.Fatalf("Align falied") } } @@ -80,17 +81,17 @@ func TestAlignTimeSync2(t *testing.T) { proxyIdList: []int64{1, 2, 3}, interval: 200, } - ts := []*pb.TimeSyncMsg{ + ts := []*internalpb.TimeSyncMsg{ { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(5), }, { - Peer_Id: 3, + PeerId: 3, Timestamp: toTimestamp(150), }, { - Peer_Id: 2, + PeerId: 2, Timestamp: toTimestamp(20), }, } @@ -98,7 +99,7 @@ func TestAlignTimeSync2(t *testing.T) { if len(r.proxyIdList) != 3 { t.Fatalf("proxyIdList should be : 1 2 3") } - if len(ts) != 1 || ts[0].Peer_Id != 2 { + if len(ts) != 1 || ts[0].PeerId != 2 { t.Fatalf("align failed") } @@ -109,25 +110,25 @@ func TestAlignTimeSync3(t *testing.T) { proxyIdList: []int64{1, 2, 3}, interval: 200, } - ts := []*pb.TimeSyncMsg{ + ts := []*internalpb.TimeSyncMsg{ { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(5), }, { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(5), }, { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(5), }, { - Peer_Id: 3, + PeerId: 3, Timestamp: toTimestamp(15), }, { - Peer_Id: 2, + PeerId: 2, Timestamp: toTimestamp(20), }, } @@ -136,7 +137,7 @@ func TestAlignTimeSync3(t *testing.T) { t.Fatalf("proxyIdList should be : 1 2 3") } for i := 0; i < len(r.proxyIdList); i++ { - if r.proxyIdList[i] != ts[i].Peer_Id { + if r.proxyIdList[i] != ts[i].PeerId { t.Fatalf("Align falied") } } @@ -147,17 +148,17 @@ func TestAlignTimeSync4(t *testing.T) { proxyIdList: []int64{1}, interval: 200, } - ts := []*pb.TimeSyncMsg{ + ts := []*internalpb.TimeSyncMsg{ { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(15), }, { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(25), }, { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(35), }, } @@ -178,25 +179,25 @@ func TestAlignTimeSync5(t *testing.T) { proxyIdList: []int64{1, 2, 3}, interval: 200, } - ts := []*pb.TimeSyncMsg{ + ts := []*internalpb.TimeSyncMsg{ { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(5), }, { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(5), }, { - Peer_Id: 1, + PeerId: 1, Timestamp: toTimestamp(5), }, { - Peer_Id: 3, + PeerId: 3, Timestamp: toTimestamp(15), }, { - Peer_Id: 3, + PeerId: 3, Timestamp: toTimestamp(20), }, } diff --git a/internal/util/tsoutil/tso.go b/internal/util/tsoutil/tso.go index c51ca6b108..9e977a52c7 100644 --- a/internal/util/tsoutil/tso.go +++ b/internal/util/tsoutil/tso.go @@ -15,8 +15,6 @@ package tsoutil import ( "time" - - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" ) const ( @@ -24,6 +22,10 @@ const ( logicalBits = (1 << physicalShiftBits) - 1 ) +func ComposeTS(physical, logical int64) uint64{ + return uint64((physical << physicalShiftBits) + logical) +} + // ParseTS parses the ts to (physical,logical). func ParseTS(ts uint64) (time.Time, uint64) { logical := ts & logicalBits @@ -32,10 +34,3 @@ func ParseTS(ts uint64) (time.Time, uint64) { return physicalTime, logical } -// ParseTimestamp parses pdpb.Timestamp to time.Time -func ParseTimestamp(ts internalpb.TimestampMsg) (time.Time, uint64) { - logical := uint64(ts.Logical) - physical := ts.Physical - physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds()) - return physicalTime, logical -} diff --git a/internal/util/typeutil/time.go b/internal/util/typeutil/time.go index 1652d0cbbf..f12c87f04e 100644 --- a/internal/util/typeutil/time.go +++ b/internal/util/typeutil/time.go @@ -17,6 +17,7 @@ import "time" // ZeroTime is a zero time. var ZeroTime = time.Time{} +var ZeroTimestamp = Timestamp(0) // ParseTimestamp returns a timestamp for a given byte slice. func ParseTimestamp(data []byte) (time.Time, error) { diff --git a/internal/util/typeutil/type.go b/internal/util/typeutil/type.go new file mode 100644 index 0000000000..14e2fe188a --- /dev/null +++ b/internal/util/typeutil/type.go @@ -0,0 +1,5 @@ +package typeutil + + +type Timestamp = uint64 +type Id = int64 \ No newline at end of file