From 1b1d4e502b0fa46d0bc6ba425a3d55f205d125b3 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Tue, 18 Jan 2022 00:07:37 +0800 Subject: [PATCH] Remove session from timetickSync (#15255) Signed-off-by: yudong.cai --- internal/rootcoord/root_coord.go | 4 ++-- internal/rootcoord/timeticksync.go | 18 +++++++++--------- internal/rootcoord/timeticksync_test.go | 10 +++------- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index fd9e19a01f..407663f878 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1032,7 +1032,7 @@ func (c *Core) Init() error { } chanMap := c.MetaTable.ListCollectionPhysicalChannels() - c.chanTimeTick = newTimeTickSync(c.ctx, c.session, c.msFactory, chanMap) + c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.msFactory, chanMap) c.chanTimeTick.addProxy(c.session) c.proxyClientManager = newProxyClientManager(c) @@ -1040,7 +1040,7 @@ func (c *Core) Init() error { c.proxyManager = newProxyManager( c.ctx, c.etcdCli, - c.chanTimeTick.getProxy, + c.chanTimeTick.clearProxy, c.proxyClientManager.GetProxyClients, ) c.proxyManager.AddSession(c.chanTimeTick.addProxy, c.proxyClientManager.AddProxyClient) diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 283ca42998..af5b8dbc87 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -44,8 +44,8 @@ var ( ) type timetickSync struct { - ctx context.Context - session *sessionutil.Session + ctx context.Context + sourceID int64 dmlChannels *dmlChannels // used for insert deltaChannels *dmlChannels // used for delete @@ -85,7 +85,7 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp { return c.defaultTs } -func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync { +func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync { // initialize dml channels used for insert dmlChannels := newDmlChannels(ctx, factory, Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DmlChannelNum) // initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels @@ -110,8 +110,8 @@ func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory } return &timetickSync{ - ctx: ctx, - session: session, + ctx: ctx, + sourceID: sourceID, dmlChannels: dmlChannels, deltaChannels: deltaChannels, @@ -236,7 +236,7 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason return nil } - if in.Base.SourceID == t.session.ServerID { + if in.Base.SourceID == t.sourceID { if prev != nil && in.DefaultTimestamp <= prev.defaultTs { log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("curr ts", in.DefaultTimestamp), @@ -273,7 +273,7 @@ func (t *timetickSync) delProxy(sess *sessionutil.Session) { } } -func (t *timetickSync) getProxy(sess []*sessionutil.Session) { +func (t *timetickSync) clearProxy(sess []*sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() for _, s := range sess { @@ -304,7 +304,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { } // reduce each channel to get min timestamp - local := proxyTimetick[t.session.ServerID] + local := proxyTimetick[t.sourceID] if len(local.chanTs) == 0 { continue } @@ -356,7 +356,7 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim MsgType: commonpb.MsgType_TimeTick, MsgID: 0, Timestamp: ts, - SourceID: t.session.ServerID, + SourceID: t.sourceID, }, } timeTickMsg := &msgstream.TimeTickMsg{ diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index f6bac1d768..a588601fba 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -26,15 +26,11 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/util/sessionutil" ) func TestTimetickSync(t *testing.T) { ctx := context.Background() - - session := &sessionutil.Session{ - ServerID: 100, - } + sourceID := int64(100) factory := msgstream.NewPmsFactory() m := map[string]interface{}{ @@ -51,7 +47,7 @@ func TestTimetickSync(t *testing.T) { Params.RootCoordCfg.DmlChannelNum = 2 Params.RootCoordCfg.DmlChannelName = "rootcoord-dml" Params.RootCoordCfg.DeltaChannelName = "rootcoord-delta" - ttSync := newTimeTickSync(ctx, session, factory, nil) + ttSync := newTimeTickSync(ctx, sourceID, factory, nil) var wg sync.WaitGroup wg.Add(1) @@ -108,7 +104,7 @@ func TestTimetickSync(t *testing.T) { assert.Nil(t, err) ttSync.ddlMinTs = uint64(300) - ttSync.session.ServerID = int64(1) + ttSync.sourceID = int64(1) err = ttSync.updateTimeTick(msg, "1") assert.Nil(t, err) })