diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 04f36b8281..2909653a7d 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -415,7 +415,6 @@ func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo, // recycleDroppedSegments scans all segments and remove those dropped segments from meta and oss. func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) { - start := time.Now() log := log.With(zap.String("gcName", "recycleDroppedSegments"), zap.Time("startAt", start)) log.Info("start clear dropped segments...") diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 3f1260adc0..76df25281c 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -22,6 +22,7 @@ package datanode import ( "context" "fmt" + "github.com/samber/lo" "go.uber.org/zap" "google.golang.org/protobuf/proto" diff --git a/pkg/mq/msgdispatcher/dispatcher_test.go b/pkg/mq/msgdispatcher/dispatcher_test.go index 000b500397..42a333c16e 100644 --- a/pkg/mq/msgdispatcher/dispatcher_test.go +++ b/pkg/mq/msgdispatcher/dispatcher_test.go @@ -81,7 +81,7 @@ func TestDispatcher(t *testing.T) { target := newTarget(&StreamConfig{ VChannel: vchannel, Pos: pos, - }) + }, false) target.ch = ch return target } @@ -104,7 +104,7 @@ func TestDispatcher(t *testing.T) { t.Run("test concurrent send and close", func(t *testing.T) { for i := 0; i < 100; i++ { output := make(chan *msgstream.MsgPack, 1024) - target := newTarget(&StreamConfig{VChannel: "mock_vchannel_0"}) + target := newTarget(&StreamConfig{VChannel: "mock_vchannel_0"}, false) target.ch = output assert.Equal(t, cap(output), cap(target.ch)) wg := &sync.WaitGroup{} @@ -145,11 +145,11 @@ func TestGroupMessage(t *testing.T) { d, err := NewDispatcher(context.Background(), newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0", nil, common.SubscriptionPositionEarliest, false, 0, false) assert.NoError(t, err) - d.AddTarget(newTarget(&StreamConfig{VChannel: "mock_pchannel_0_1v0"})) + d.AddTarget(newTarget(&StreamConfig{VChannel: "mock_pchannel_0_1v0"}, false)) d.AddTarget(newTarget(&StreamConfig{ VChannel: "mock_pchannel_0_2v0", ReplicateConfig: msgstream.GetReplicateConfig("local-test", "foo", "coo"), - })) + }, false)) { // no replicate msg packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{ diff --git a/pkg/mq/msgdispatcher/manager.go b/pkg/mq/msgdispatcher/manager.go index 58da1f415d..ea51ffb777 100644 --- a/pkg/mq/msgdispatcher/manager.go +++ b/pkg/mq/msgdispatcher/manager.go @@ -84,7 +84,7 @@ func NewDispatcherManager(pchannel string, role string, nodeID int64, factory ms } func (c *dispatcherManager) Add(ctx context.Context, streamConfig *StreamConfig) (<-chan *MsgPack, error) { - t := newTarget(streamConfig) + t := newTarget(streamConfig, c.includeSkipWhenSplit) if _, ok := c.registeredTargets.GetOrInsert(t.vchannel, t); ok { return nil, fmt.Errorf("vchannel %s already exists in the dispatcher", t.vchannel) } diff --git a/pkg/mq/msgdispatcher/target.go b/pkg/mq/msgdispatcher/target.go index 50aa7ed05b..5474537abe 100644 --- a/pkg/mq/msgdispatcher/target.go +++ b/pkg/mq/msgdispatcher/target.go @@ -30,11 +30,13 @@ import ( ) type target struct { - vchannel string - ch chan *MsgPack - subPos SubPos - pos *Pos - isLagged bool + vchannel string + ch chan *MsgPack + subPos SubPos + pos *Pos + filterSameTimeTick bool + latestTimeTick uint64 + isLagged bool closeMu sync.Mutex closeOnce sync.Once @@ -46,18 +48,20 @@ type target struct { cancelCh lifetime.SafeChan } -func newTarget(streamConfig *StreamConfig) *target { +func newTarget(streamConfig *StreamConfig, filterSameTimeTick bool) *target { replicateConfig := streamConfig.ReplicateConfig maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second) t := &target{ - vchannel: streamConfig.VChannel, - ch: make(chan *MsgPack, paramtable.Get().MQCfg.TargetBufSize.GetAsInt()), - subPos: streamConfig.SubPos, - pos: streamConfig.Pos, - cancelCh: lifetime.NewSafeChan(), - maxLag: maxTolerantLag, - timer: time.NewTimer(maxTolerantLag), - replicateConfig: replicateConfig, + vchannel: streamConfig.VChannel, + ch: make(chan *MsgPack, paramtable.Get().MQCfg.TargetBufSize.GetAsInt()), + subPos: streamConfig.SubPos, + pos: streamConfig.Pos, + filterSameTimeTick: filterSameTimeTick, + latestTimeTick: 0, + cancelCh: lifetime.NewSafeChan(), + maxLag: maxTolerantLag, + timer: time.NewTimer(maxTolerantLag), + replicateConfig: replicateConfig, } t.closed = false if replicateConfig != nil { @@ -87,6 +91,23 @@ func (t *target) send(pack *MsgPack) error { return nil } + if t.filterSameTimeTick { + if pack.EndPositions[0].GetTimestamp() <= t.latestTimeTick { + if len(pack.Msgs) > 0 { + // only filter out the msg that is only timetick message, + // So it's a unexpected behavior if the msgs is not empty + log.Warn("some data lost when time tick filtering", + zap.String("vchannel", t.vchannel), + zap.Uint64("latestTimeTick", t.latestTimeTick), + zap.Uint64("packEndTs", pack.EndPositions[0].GetTimestamp()), + zap.Int("msgCount", len(pack.Msgs)), + ) + } + // filter out the msg that is already sent with the same timetick. + return nil + } + } + if !t.timer.Stop() { select { case <-t.timer.C: @@ -100,8 +121,11 @@ func (t *target) send(pack *MsgPack) error { return nil case <-t.timer.C: t.isLagged = true - return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s, beginTs=%d, endTs=%d", t.vchannel, t.maxLag, pack.BeginTs, pack.EndTs) + return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s, beginTs=%d, endTs=%d, latestTimeTick=%d", t.vchannel, t.maxLag, pack.BeginTs, pack.EndTs, t.latestTimeTick) case t.ch <- pack: + if len(pack.EndPositions) > 0 { + t.latestTimeTick = pack.EndPositions[0].GetTimestamp() + } return nil } } diff --git a/pkg/mq/msgdispatcher/target_test.go b/pkg/mq/msgdispatcher/target_test.go index 4a40116897..851a6eccae 100644 --- a/pkg/mq/msgdispatcher/target_test.go +++ b/pkg/mq/msgdispatcher/target_test.go @@ -17,7 +17,7 @@ func TestSendTimeout(t *testing.T) { target := newTarget(&StreamConfig{ VChannel: "test1", Pos: &msgpb.MsgPosition{}, - }) + }, false) time.Sleep(paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)) @@ -31,3 +31,25 @@ func TestSendTimeout(t *testing.T) { } assert.Equal(t, counter, 0) } + +func TestSendTimeTickFiltering(t *testing.T) { + target := newTarget(&StreamConfig{ + VChannel: "test1", + Pos: &msgpb.MsgPosition{}, + }, true) + target.send(&msgstream.MsgPack{ + EndPositions: []*msgpb.MsgPosition{ + { + Timestamp: 1, + }, + }, + }) + + target.send(&msgstream.MsgPack{ + EndPositions: []*msgpb.MsgPosition{ + { + Timestamp: 1, + }, + }, + }) +}