diff --git a/go.mod b/go.mod index 94fe702b10..cba5f68131 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,7 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/antonmedv/expr v1.8.9 github.com/apache/pulsar-client-go v0.4.0 - github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 // indirect - github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect github.com/coreos/etcd v3.3.13+incompatible - github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect @@ -26,6 +23,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pkg/errors v0.9.1 + github.com/quasilyte/go-ruleguard v0.2.1 // indirect github.com/sirupsen/logrus v1.6.0 // indirect github.com/spaolacci/murmur3 v1.1.0 github.com/spf13/cast v1.3.0 diff --git a/go.sum b/go.sum index 58c208fcaf..c9dc026fc6 100644 --- a/go.sum +++ b/go.sum @@ -316,6 +316,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/quasilyte/go-ruleguard v0.2.1 h1:56eRm0daAyny9UhJnmtJW/UyLZQusukBAB8oT8AHKHo= +github.com/quasilyte/go-ruleguard v0.2.1/go.mod h1:hN2rVc/uS4bQhQKTio2XaSJSafJwqBUWWwtssT3cQmc= github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShEf0EaOCaTQYyB7d5nSbb181KtjlS+84= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -376,6 +378,7 @@ github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhD github.com/yahoo/athenz v1.9.16 h1:2s8KtIxwAbcJIYySsfrT/t/WO0Ss5O7BPGUN/q8x2bg= github.com/yahoo/athenz v1.9.16/go.mod h1:guj+0Ut6F33wj+OcSRlw69O0itsR7tVocv15F2wJnIo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= @@ -449,6 +452,7 @@ golang.org/x/net v0.0.0-20190921015927-1a5e07d1ff72/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= @@ -464,6 +468,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -527,6 +532,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200812195022-5ae4c3c160a0/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/allocator/allocator.go b/internal/allocator/allocator.go index 6b9609fd06..86209465a3 100644 --- a/internal/allocator/allocator.go +++ b/internal/allocator/allocator.go @@ -51,12 +51,6 @@ type IDRequest struct { count uint32 } -type TSORequest struct { - BaseRequest - timestamp Timestamp - count uint32 -} - type SyncRequest struct { BaseRequest } diff --git a/internal/allocator/timestamp.go b/internal/allocator/timestamp.go deleted file mode 100644 index 43bf2377a3..0000000000 --- a/internal/allocator/timestamp.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package allocator - -import ( - "context" - "fmt" - "time" - - "go.uber.org/zap" - - msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/masterpb" - "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/typeutil" -) - -type Timestamp = typeutil.Timestamp - -const ( - tsCountPerRPC = 2 << 15 -) - -type TimestampAllocator struct { - Allocator - - masterAddress string - masterClient types.MasterService - - countPerRPC uint32 - lastTsBegin Timestamp - lastTsEnd Timestamp - PeerID UniqueID -} - -func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAllocator, error) { - ctx1, cancel := context.WithCancel(ctx) - a := &TimestampAllocator{ - Allocator: Allocator{ - Ctx: ctx1, - CancelFunc: cancel, - Role: "TimestampAllocator", - }, - masterAddress: masterAddr, - countPerRPC: tsCountPerRPC, - } - a.TChan = &Ticker{ - UpdateInterval: time.Second, - } - a.Allocator.SyncFunc = a.syncTs - a.Allocator.ProcessFunc = a.processFunc - a.Allocator.CheckSyncFunc = a.checkSyncFunc - a.Allocator.PickCanDoFunc = a.pickCanDoFunc - a.Init() - return a, nil -} - -func (ta *TimestampAllocator) Start() error { - var err error - ta.masterClient, err = msc.NewClient(ta.masterAddress, 20*time.Second) - if err != nil { - panic(err) - } - - if err = ta.masterClient.Init(); err != nil { - panic(err) - } - - if err = ta.masterClient.Start(); err != nil { - panic(err) - } - return ta.Allocator.Start() -} - -func (ta *TimestampAllocator) checkSyncFunc(timeout bool) bool { - return timeout || len(ta.ToDoReqs) > 0 -} - -func (ta *TimestampAllocator) pickCanDoFunc() { - total := uint32(ta.lastTsEnd - ta.lastTsBegin) - need := uint32(0) - idx := 0 - for _, req := range ta.ToDoReqs { - tReq := req.(*TSORequest) - need += tReq.count - if need <= total { - ta.CanDoReqs = append(ta.CanDoReqs, req) - idx++ - } else { - break - } - } - ta.ToDoReqs = ta.ToDoReqs[idx:] - log.Debug("TimestampAllocator pickCanDoFunc", - zap.Any("need", need), - zap.Any("total", total), - zap.Any("remainReqCnt", len(ta.ToDoReqs))) -} - -func (ta *TimestampAllocator) gatherReqTsCount() uint32 { - need := uint32(0) - for _, req := range ta.ToDoReqs { - tReq := req.(*TSORequest) - need += tReq.count - } - return need -} - -func (ta *TimestampAllocator) syncTs() (bool, error) { - need := ta.gatherReqTsCount() - if need < ta.countPerRPC { - need = ta.countPerRPC - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - req := &masterpb.AllocTimestampRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_RequestTSO, - MsgID: 0, - Timestamp: 0, - SourceID: ta.PeerID, - }, - Count: need, - } - - resp, err := ta.masterClient.AllocTimestamp(ctx, req) - defer cancel() - - if err != nil { - return false, fmt.Errorf("syncTimestamp Failed:%w", err) - } - ta.lastTsBegin = resp.GetTimestamp() - ta.lastTsEnd = ta.lastTsBegin + uint64(resp.GetCount()) - return true, nil -} - -func (ta *TimestampAllocator) processFunc(req Request) error { - tsoRequest := req.(*TSORequest) - tsoRequest.timestamp = ta.lastTsBegin - ta.lastTsBegin++ - return nil -} - -func (ta *TimestampAllocator) AllocOne() (Timestamp, error) { - ret, err := ta.Alloc(1) - if err != nil { - return 0, err - } - return ret[0], nil -} - -func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) { - req := &TSORequest{ - BaseRequest: BaseRequest{Done: make(chan error), Valid: false}, - } - req.count = count - ta.Reqs <- req - if err := req.Wait(); err != nil { - return nil, fmt.Errorf("alloc time stamp request failed: %s", err) - } - - start, count := req.timestamp, req.count - var ret []Timestamp - for i := uint32(0); i < count; i++ { - ret = append(ret, start+uint64(i)) - } - return ret, nil -} - -func (ta *TimestampAllocator) ClearCache() { - -} diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 061982bccb..3da1f80171 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -56,7 +56,7 @@ type ProxyNode struct { tick *timeTick idAllocator *allocator.IDAllocator - tsoAllocator *allocator.TimestampAllocator + tsoAllocator *TimestampAllocator segAssigner *SegIDAssigner queryMsgStream msgstream.MsgStream @@ -178,12 +178,11 @@ func (node *ProxyNode) Init() error { node.idAllocator = idAllocator node.idAllocator.PeerID = Params.ProxyID - tsoAllocator, err := allocator.NewTimestampAllocator(node.ctx, masterAddr) + tsoAllocator, err := NewTimestampAllocator(node.masterService, Params.ProxyID) if err != nil { return err } node.tsoAllocator = tsoAllocator - node.tsoAllocator.PeerID = Params.ProxyID segAssigner, err := NewSegIDAssigner(node.ctx, node.dataService, node.lastTick) if err != nil { @@ -221,9 +220,6 @@ func (node *ProxyNode) Start() error { node.idAllocator.Start() log.Debug("start id allocator ...") - node.tsoAllocator.Start() - log.Debug("start tso allocator ...") - node.segAssigner.Start() log.Debug("start seg assigner ...") @@ -247,7 +243,6 @@ func (node *ProxyNode) Stop() error { node.cancel() globalInsertChannelsMap.CloseAllMsgStream() - node.tsoAllocator.Close() node.idAllocator.Close() node.segAssigner.Close() node.sched.Close() diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go index 3aa55e0692..a3d0971621 100644 --- a/internal/proxynode/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -259,7 +259,7 @@ type TaskScheduler struct { DqQueue TaskQueue idAllocator *allocator.IDAllocator - tsoAllocator *allocator.TimestampAllocator + tsoAllocator *TimestampAllocator wg sync.WaitGroup ctx context.Context @@ -270,7 +270,7 @@ type TaskScheduler struct { func NewTaskScheduler(ctx context.Context, idAllocator *allocator.IDAllocator, - tsoAllocator *allocator.TimestampAllocator, + tsoAllocator *TimestampAllocator, factory msgstream.Factory) (*TaskScheduler, error) { ctx1, cancel := context.WithCancel(ctx) s := &TaskScheduler{ diff --git a/internal/proxynode/timestamp.go b/internal/proxynode/timestamp.go new file mode 100644 index 0000000000..1d739bff10 --- /dev/null +++ b/internal/proxynode/timestamp.go @@ -0,0 +1,70 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package proxynode + +import ( + "context" + "fmt" + "time" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/masterpb" + "github.com/milvus-io/milvus/internal/types" +) + +type TimestampAllocator struct { + masterService types.MasterService + peerID UniqueID +} + +func NewTimestampAllocator(master types.MasterService, peerID UniqueID) (*TimestampAllocator, error) { + a := &TimestampAllocator{ + peerID: peerID, + masterService: master, + } + return a, nil +} + +func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + req := &masterpb.AllocTimestampRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_RequestTSO, + MsgID: 0, + Timestamp: 0, + SourceID: ta.peerID, + }, + Count: count, + } + + resp, err := ta.masterService.AllocTimestamp(ctx, req) + defer cancel() + + if err != nil { + return nil, fmt.Errorf("syncTimestamp Failed:%w", err) + } + start, cnt := resp.Timestamp, resp.Count + var ret []Timestamp + for i := uint32(0); i < cnt; i++ { + ret = append(ret, start+uint64(i)) + } + + return ret, nil +} + +func (ta *TimestampAllocator) AllocOne() (Timestamp, error) { + ret, err := ta.Alloc(1) + if err != nil { + return 0, err + } + return ret[0], nil +} diff --git a/internal/proxynode/timetick.go b/internal/proxynode/timetick.go index 98a2b54083..cd1f5e0d82 100644 --- a/internal/proxynode/timetick.go +++ b/internal/proxynode/timetick.go @@ -19,7 +19,6 @@ import ( "go.uber.org/zap" "github.com/apache/pulsar-client-go/pulsar" - "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -35,7 +34,7 @@ type timeTick struct { pulsarProducer pulsar.Producer - tsoAllocator *allocator.TimestampAllocator + tsoAllocator *TimestampAllocator tickMsgStream msgstream.MsgStream msFactory msgstream.Factory @@ -49,7 +48,7 @@ type timeTick struct { } func newTimeTick(ctx context.Context, - tsoAllocator *allocator.TimestampAllocator, + tsoAllocator *TimestampAllocator, interval time.Duration, checkFunc tickCheckFunc, factory msgstream.Factory) *timeTick {