diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 43504711e9..ff95e135f0 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -487,7 +487,7 @@ func (node *QueryNode) Init() error { node.loader = segments.NewLoader(node.ctx, node.manager, node.chunkManager) node.manager.SetLoader(node.loader) if streamingutil.IsStreamingServiceEnabled() { - node.dispClient = msgdispatcher.NewClient(streaming.NewDelegatorMsgstreamFactory(), typeutil.QueryNodeRole, node.GetNodeID()) + node.dispClient = msgdispatcher.NewClientWithIncludeSkipWhenSplit(streaming.NewDelegatorMsgstreamFactory(), typeutil.QueryNodeRole, node.GetNodeID()) } else { node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID()) } diff --git a/pkg/mq/msgdispatcher/client.go b/pkg/mq/msgdispatcher/client.go index 5d2d6df58e..e04caccee8 100644 --- a/pkg/mq/msgdispatcher/client.go +++ b/pkg/mq/msgdispatcher/client.go @@ -61,20 +61,28 @@ type Client interface { var _ Client = (*client)(nil) type client struct { - role string - nodeID int64 - managers *typeutil.ConcurrentMap[string, DispatcherManager] - managerMut *lock.KeyLock[string] - factory msgstream.Factory + role string + nodeID int64 + managers *typeutil.ConcurrentMap[string, DispatcherManager] + managerMut *lock.KeyLock[string] + factory msgstream.Factory + includeSkipWhenSplit bool +} + +func NewClientWithIncludeSkipWhenSplit(factory msgstream.Factory, role string, nodeID int64) Client { + c := NewClient(factory, role, nodeID) + c.(*client).includeSkipWhenSplit = true + return c } func NewClient(factory msgstream.Factory, role string, nodeID int64) Client { return &client{ - role: role, - nodeID: nodeID, - factory: factory, - managers: typeutil.NewConcurrentMap[string, DispatcherManager](), - managerMut: lock.NewKeyLock[string](), + role: role, + nodeID: nodeID, + factory: factory, + managers: typeutil.NewConcurrentMap[string, DispatcherManager](), + managerMut: lock.NewKeyLock[string](), + includeSkipWhenSplit: false, } } @@ -91,7 +99,7 @@ func (c *client) Register(ctx context.Context, streamConfig *StreamConfig) (<-ch var manager DispatcherManager manager, ok := c.managers.Get(pchannel) if !ok { - manager = NewDispatcherManager(pchannel, c.role, c.nodeID, c.factory) + manager = NewDispatcherManager(pchannel, c.role, c.nodeID, c.factory, c.includeSkipWhenSplit) c.managers.Insert(pchannel, manager) go manager.Run() } diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index dd831e92db..6d4918722c 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -78,6 +78,8 @@ type Dispatcher struct { targets *typeutil.ConcurrentMap[string, *target] stream msgstream.MsgStream + + includeSkipWhenSplit bool } func NewDispatcher( @@ -89,6 +91,7 @@ func NewDispatcher( subPos SubPos, includeCurrentMsg bool, pullbackEndTs typeutil.Timestamp, + includeSkipWhenSplit bool, ) (*Dispatcher, error) { subName := fmt.Sprintf("%s-%d-%d", pchannel, id, time.Now().UnixNano()) @@ -252,7 +255,11 @@ func (d *Dispatcher) work() { isReplicateChannel := strings.Contains(vchannel, paramtable.Get().CommonCfg.ReplicateMsgChannel.GetValue()) // The dispatcher seeks from the oldest target, // so for each target, msg before the target position must be filtered out. - if p.EndTs <= t.pos.GetTimestamp() && !isReplicateChannel { + // + // From 2.6.0, every message has a unique timetick, so we can filter out the msg by < but not <=. + if ((d.includeSkipWhenSplit && p.EndTs < t.pos.GetTimestamp()) || + (!d.includeSkipWhenSplit && p.EndTs <= t.pos.GetTimestamp())) && + !isReplicateChannel { log.Info("skip msg", zap.String("vchannel", vchannel), zap.Int("msgCount", len(p.Msgs)), diff --git a/pkg/mq/msgdispatcher/dispatcher_test.go b/pkg/mq/msgdispatcher/dispatcher_test.go index 32eb4887b8..000b500397 100644 --- a/pkg/mq/msgdispatcher/dispatcher_test.go +++ b/pkg/mq/msgdispatcher/dispatcher_test.go @@ -36,7 +36,7 @@ func TestDispatcher(t *testing.T) { ctx := context.Background() t.Run("test base", func(t *testing.T) { d, err := NewDispatcher(ctx, newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0", - nil, common.SubscriptionPositionEarliest, false, 0) + nil, common.SubscriptionPositionEarliest, false, 0, false) assert.NoError(t, err) assert.NotPanics(t, func() { d.Handle(start) @@ -65,7 +65,7 @@ func TestDispatcher(t *testing.T) { }, } d, err := NewDispatcher(ctx, factory, time.Now().UnixNano(), "mock_pchannel_0", - nil, common.SubscriptionPositionEarliest, false, 0) + nil, common.SubscriptionPositionEarliest, false, 0, false) assert.Error(t, err) assert.Nil(t, d) @@ -73,7 +73,7 @@ func TestDispatcher(t *testing.T) { t.Run("test target", func(t *testing.T) { d, err := NewDispatcher(ctx, newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0", - nil, common.SubscriptionPositionEarliest, false, 0) + nil, common.SubscriptionPositionEarliest, false, 0, false) assert.NoError(t, err) output := make(chan *msgstream.MsgPack, 1024) @@ -128,7 +128,7 @@ func TestDispatcher(t *testing.T) { func BenchmarkDispatcher_handle(b *testing.B) { d, err := NewDispatcher(context.Background(), newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0", - nil, common.SubscriptionPositionEarliest, false, 0) + nil, common.SubscriptionPositionEarliest, false, 0, false) assert.NoError(b, err) for i := 0; i < b.N; i++ { @@ -143,7 +143,7 @@ func BenchmarkDispatcher_handle(b *testing.B) { func TestGroupMessage(t *testing.T) { d, err := NewDispatcher(context.Background(), newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0", - nil, common.SubscriptionPositionEarliest, false, 0) + nil, common.SubscriptionPositionEarliest, false, 0, false) assert.NoError(t, err) d.AddTarget(newTarget(&StreamConfig{VChannel: "mock_pchannel_0_1v0"})) d.AddTarget(newTarget(&StreamConfig{ diff --git a/pkg/mq/msgdispatcher/manager.go b/pkg/mq/msgdispatcher/manager.go index 05a88766a5..58da1f415d 100644 --- a/pkg/mq/msgdispatcher/manager.go +++ b/pkg/mq/msgdispatcher/manager.go @@ -63,19 +63,22 @@ type dispatcherManager struct { factory msgstream.Factory closeChan chan struct{} closeOnce sync.Once + + includeSkipWhenSplit bool } -func NewDispatcherManager(pchannel string, role string, nodeID int64, factory msgstream.Factory) DispatcherManager { +func NewDispatcherManager(pchannel string, role string, nodeID int64, factory msgstream.Factory, includeSkipWhenSplit bool) DispatcherManager { log.Info("create new dispatcherManager", zap.String("role", role), zap.Int64("nodeID", nodeID), zap.String("pchannel", pchannel)) c := &dispatcherManager{ - role: role, - nodeID: nodeID, - pchannel: pchannel, - registeredTargets: typeutil.NewConcurrentMap[string, *target](), - deputyDispatchers: make(map[int64]*Dispatcher), - factory: factory, - closeChan: make(chan struct{}), + role: role, + nodeID: nodeID, + pchannel: pchannel, + registeredTargets: typeutil.NewConcurrentMap[string, *target](), + deputyDispatchers: make(map[int64]*Dispatcher), + factory: factory, + closeChan: make(chan struct{}), + includeSkipWhenSplit: includeSkipWhenSplit, } return c } @@ -269,7 +272,7 @@ OUTER: // TODO: add newDispatcher timeout param and init context id := c.idAllocator.Inc() - d, err := NewDispatcher(context.Background(), c.factory, id, c.pchannel, earliestTarget.pos, earliestTarget.subPos, includeCurrentMsg, latestTarget.pos.GetTimestamp()) + d, err := NewDispatcher(context.Background(), c.factory, id, c.pchannel, earliestTarget.pos, earliestTarget.subPos, includeCurrentMsg, latestTarget.pos.GetTimestamp(), c.includeSkipWhenSplit) if err != nil { panic(err) } diff --git a/pkg/mq/msgdispatcher/manager_test.go b/pkg/mq/msgdispatcher/manager_test.go index 19777a1f6a..88717e520c 100644 --- a/pkg/mq/msgdispatcher/manager_test.go +++ b/pkg/mq/msgdispatcher/manager_test.go @@ -46,7 +46,7 @@ func TestManager(t *testing.T) { assert.NoError(t, err) go produceTimeTick(t, ctx, producer) - c := NewDispatcherManager(pchannel, typeutil.ProxyRole, 1, factory) + c := NewDispatcherManager(pchannel, typeutil.ProxyRole, 1, factory, false) assert.NotNil(t, c) go c.Run() defer c.Close() @@ -93,7 +93,7 @@ func TestManager(t *testing.T) { assert.NoError(t, err) go produceTimeTick(t, ctx, producer) - c := NewDispatcherManager(pchannel, typeutil.ProxyRole, 1, factory) + c := NewDispatcherManager(pchannel, typeutil.ProxyRole, 1, factory, false) assert.NotNil(t, c) go c.Run() @@ -157,7 +157,7 @@ func TestManager(t *testing.T) { assert.NoError(t, err) go produceTimeTick(t, ctx, producer) - c := NewDispatcherManager(pchannel, typeutil.ProxyRole, 1, factory) + c := NewDispatcherManager(pchannel, typeutil.ProxyRole, 1, factory, false) assert.NotNil(t, c) go c.Run() @@ -202,7 +202,7 @@ func TestManager(t *testing.T) { assert.NoError(t, err) go produceTimeTick(t, ctx, producer) - c := NewDispatcherManager(pchannel, typeutil.ProxyRole, 1, factory) + c := NewDispatcherManager(pchannel, typeutil.ProxyRole, 1, factory, false) go c.Run() defer c.Close()