Use the request's context to allocate ts (#22633)

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2023-03-09 18:51:52 +08:00 committed by GitHub
parent e4120d1671
commit e581fef115
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 17 additions and 20 deletions

View File

@ -90,7 +90,7 @@ func (ticker *channelsTimeTickerImpl) initCurrents(current Timestamp) {
} }
func (ticker *channelsTimeTickerImpl) tick() error { func (ticker *channelsTimeTickerImpl) tick() error {
now, err := ticker.tso.AllocOne() now, err := ticker.tso.AllocOne(ticker.ctx)
if err != nil { if err != nil {
log.Warn("Proxy channelsTimeTickerImpl failed to get ts from tso", zap.Error(err)) log.Warn("Proxy channelsTimeTickerImpl failed to get ts from tso", zap.Error(err))
return err return err
@ -168,7 +168,7 @@ func (ticker *channelsTimeTickerImpl) tickLoop() {
func (ticker *channelsTimeTickerImpl) start() error { func (ticker *channelsTimeTickerImpl) start() error {
ticker.initStatistics() ticker.initStatistics()
current, err := ticker.tso.AllocOne() current, err := ticker.tso.AllocOne(ticker.ctx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -25,7 +25,7 @@ import (
// use interface tsoAllocator to keep other components testable // use interface tsoAllocator to keep other components testable
// include: channelsTimeTickerImpl, baseTaskQueue, taskScheduler // include: channelsTimeTickerImpl, baseTaskQueue, taskScheduler
type tsoAllocator interface { type tsoAllocator interface {
AllocOne() (Timestamp, error) AllocOne(ctx context.Context) (Timestamp, error)
} }
// use timestampAllocatorInterface to keep other components testable // use timestampAllocatorInterface to keep other components testable

View File

@ -67,7 +67,7 @@ func newMockTimestampAllocatorInterface() timestampAllocatorInterface {
type mockTsoAllocator struct { type mockTsoAllocator struct {
} }
func (tso *mockTsoAllocator) AllocOne() (Timestamp, error) { func (tso *mockTsoAllocator) AllocOne(ctx context.Context) (Timestamp, error) {
return Timestamp(time.Now().UnixNano()), nil return Timestamp(time.Now().UnixNano()), nil
} }

View File

@ -207,7 +207,7 @@ func (node *Proxy) Init() error {
log.Debug("create id allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) log.Debug("create id allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
log.Debug("create timestamp allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) log.Debug("create timestamp allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
tsoAllocator, err := newTimestampAllocator(node.ctx, node.rootCoord, paramtable.GetNodeID()) tsoAllocator, err := newTimestampAllocator(node.rootCoord, paramtable.GetNodeID())
if err != nil { if err != nil {
log.Warn("failed to create timestamp allocator", log.Warn("failed to create timestamp allocator",
zap.Error(err), zap.Error(err),

View File

@ -169,7 +169,7 @@ func (queue *baseTaskQueue) Enqueue(t task) error {
return err return err
} }
ts, err := queue.tsoAllocatorIns.AllocOne() ts, err := queue.tsoAllocatorIns.AllocOne(t.TraceCtx())
if err != nil { if err != nil {
return err return err
} }

View File

@ -32,24 +32,23 @@ import (
// timestampAllocator implements tsoAllocator. // timestampAllocator implements tsoAllocator.
type timestampAllocator struct { type timestampAllocator struct {
ctx context.Context
tso timestampAllocatorInterface tso timestampAllocatorInterface
peerID UniqueID peerID UniqueID
} }
// newTimestampAllocator creates a new timestampAllocator // newTimestampAllocator creates a new timestampAllocator
func newTimestampAllocator(ctx context.Context, tso timestampAllocatorInterface, peerID UniqueID) (*timestampAllocator, error) { func newTimestampAllocator(tso timestampAllocatorInterface, peerID UniqueID) (*timestampAllocator, error) {
a := &timestampAllocator{ a := &timestampAllocator{
ctx: ctx,
peerID: peerID, peerID: peerID,
tso: tso, tso: tso,
} }
return a, nil return a, nil
} }
func (ta *timestampAllocator) alloc(count uint32) ([]Timestamp, error) { func (ta *timestampAllocator) alloc(ctx context.Context, count uint32) ([]Timestamp, error) {
tr := timerecord.NewTimeRecorder("applyTimestamp") tr := timerecord.NewTimeRecorder("applyTimestamp")
ctx, cancel := context.WithTimeout(ta.ctx, 5*time.Second) ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
req := &rootcoordpb.AllocTimestampRequest{ req := &rootcoordpb.AllocTimestampRequest{
Base: commonpbutil.NewMsgBase( Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO), commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO),
@ -61,7 +60,6 @@ func (ta *timestampAllocator) alloc(count uint32) ([]Timestamp, error) {
resp, err := ta.tso.AllocTimestamp(ctx, req) resp, err := ta.tso.AllocTimestamp(ctx, req)
defer func() { defer func() {
cancel()
metrics.ProxyApplyTimestampLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyApplyTimestampLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
}() }()
@ -81,8 +79,8 @@ func (ta *timestampAllocator) alloc(count uint32) ([]Timestamp, error) {
} }
// AllocOne allocates a timestamp. // AllocOne allocates a timestamp.
func (ta *timestampAllocator) AllocOne() (Timestamp, error) { func (ta *timestampAllocator) AllocOne(ctx context.Context) (Timestamp, error) {
ret, err := ta.alloc(1) ret, err := ta.alloc(ctx, 1)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -27,11 +27,10 @@ import (
) )
func TestNewTimestampAllocator(t *testing.T) { func TestNewTimestampAllocator(t *testing.T) {
ctx := context.Background()
tso := newMockTimestampAllocatorInterface() tso := newMockTimestampAllocatorInterface()
peerID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) peerID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
tsAllocator, err := newTimestampAllocator(ctx, tso, peerID) tsAllocator, err := newTimestampAllocator(tso, peerID)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, tsAllocator) assert.NotNil(t, tsAllocator)
} }
@ -41,12 +40,12 @@ func TestTimestampAllocator_alloc(t *testing.T) {
tso := newMockTimestampAllocatorInterface() tso := newMockTimestampAllocatorInterface()
peerID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) peerID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
tsAllocator, err := newTimestampAllocator(ctx, tso, peerID) tsAllocator, err := newTimestampAllocator(tso, peerID)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, tsAllocator) assert.NotNil(t, tsAllocator)
count := rand.Uint32()%100 + 1 count := rand.Uint32()%100 + 1
ret, err := tsAllocator.alloc(count) ret, err := tsAllocator.alloc(ctx, count)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, int(count), len(ret)) assert.Equal(t, int(count), len(ret))
} }
@ -56,10 +55,10 @@ func TestTimestampAllocator_AllocOne(t *testing.T) {
tso := newMockTimestampAllocatorInterface() tso := newMockTimestampAllocatorInterface()
peerID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) peerID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
tsAllocator, err := newTimestampAllocator(ctx, tso, peerID) tsAllocator, err := newTimestampAllocator(tso, peerID)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, tsAllocator) assert.NotNil(t, tsAllocator)
_, err = tsAllocator.AllocOne() _, err = tsAllocator.AllocOne(ctx)
assert.Nil(t, err) assert.Nil(t, err)
} }