From c9653b168317583f9c0dc8d46c7705aaef7fad49 Mon Sep 17 00:00:00 2001 From: SimFG Date: Thu, 28 Sep 2023 09:35:27 +0800 Subject: [PATCH] Add some log and improve TestSessionProcessActiveStandBy test case (#27403) Signed-off-by: SimFG --- internal/datacoord/segment_manager.go | 6 +++++- internal/util/sessionutil/session_util_test.go | 15 +++++++++++++-- pkg/mq/msgstream/mq_msgstream.go | 2 ++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 08b2206755..648f814f44 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -504,7 +504,11 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin ret := make([]UniqueID, 0, len(s.segments)) for _, id := range s.segments { info := s.meta.GetHealthySegment(id) - if info == nil || info.InsertChannel != channel { + if info == nil { + continue + } + if info.InsertChannel != channel { + log.Warn("the channel of flushable segments isn't equal", zap.String("insert_channel", info.InsertChannel), zap.String("channel", channel), zap.Int64("segment", id)) continue } if s.flushPolicy(info, t) { diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 293adac943..c5da64a5e2 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -659,8 +659,19 @@ func TestSessionProcessActiveStandBy(t *testing.T) { log.Debug("Stop session 1, session 2 will take over primary service") assert.False(t, flag) - s1.Stop() - <-signal + s1.safeCloseLiveCh() + { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, _ = s1.etcdCli.Revoke(ctx, *s1.leaseID) + } + select { + case <-signal: + log.Debug("receive s1 signal") + case <-time.After(10 * time.Second): + log.Debug("wait to fail Liveness Check timeout") + t.FailNow() + } assert.True(t, flag) wg.Wait() diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 13e03b905e..086d0ed16d 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -869,6 +869,8 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi MsgID: msg.ID().Serialize(), }) ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg) + } else { + log.Info("skip msg", zap.Any("msg", tsMsg)) } } }