From a308d2c886e5fb78b439f6f123dfd7bcead0376d Mon Sep 17 00:00:00 2001 From: SimFG Date: Thu, 10 Apr 2025 15:14:28 +0800 Subject: [PATCH] fix: get replicate channel position (#41188) - issue: #41187 Signed-off-by: SimFG --- internal/proxy/impl.go | 24 ++++++++++++------------ pkg/mq/msgdispatcher/manager_test.go | 5 ++++- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index f8eae2b6a5..6217ef3880 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -6512,18 +6512,6 @@ func (node *Proxy) ReplicateMessage(ctx context.Context, req *milvuspb.Replicate return &milvuspb.ReplicateMessageResponse{Status: merr.Status(err)}, nil } - collectionReplicateEnable := paramtable.Get().CommonCfg.CollectionReplicateEnable.GetAsBool() - ttMsgEnabled := paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool() - - // replicate message can be use in two ways, otherwise return error - // 1. collectionReplicateEnable is false and ttMsgEnabled is false, active/standby mode - // 2. collectionReplicateEnable is true and ttMsgEnabled is true, data migration mode - if (!collectionReplicateEnable && ttMsgEnabled) || (collectionReplicateEnable && !ttMsgEnabled) { - return &milvuspb.ReplicateMessageResponse{ - Status: merr.Status(merr.ErrDenyReplicateMessage), - }, nil - } - var err error if req.GetChannelName() == "" { log.Ctx(ctx).Warn("channel name is empty") @@ -6555,6 +6543,18 @@ func (node *Proxy) ReplicateMessage(ctx context.Context, req *milvuspb.Replicate }, nil } + collectionReplicateEnable := paramtable.Get().CommonCfg.CollectionReplicateEnable.GetAsBool() + ttMsgEnabled := paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool() + + // replicate message can be use in two ways, otherwise return error + // 1. collectionReplicateEnable is false and ttMsgEnabled is false, active/standby mode + // 2. collectionReplicateEnable is true and ttMsgEnabled is true, data migration mode + if (!collectionReplicateEnable && ttMsgEnabled) || (collectionReplicateEnable && !ttMsgEnabled) { + return &milvuspb.ReplicateMessageResponse{ + Status: merr.Status(merr.ErrDenyReplicateMessage), + }, nil + } + msgPack := &msgstream.MsgPack{ BeginTs: req.BeginTs, EndTs: req.EndTs, diff --git a/pkg/mq/msgdispatcher/manager_test.go b/pkg/mq/msgdispatcher/manager_test.go index 7acacd433d..1bded78c79 100644 --- a/pkg/mq/msgdispatcher/manager_test.go +++ b/pkg/mq/msgdispatcher/manager_test.go @@ -172,7 +172,10 @@ func TestManager(t *testing.T) { assert.Equal(t, 3, c.NumTarget()) assert.Eventually(t, func() bool { return c.NumConsumer() >= 1 - }, 3*time.Second, 10*time.Millisecond) + }, 10*time.Second, 10*time.Millisecond) + if c.(*dispatcherManager).mainDispatcher == nil { + t.FailNow() + } c.(*dispatcherManager).mainDispatcher.curTs.Store(1000) for _, d := range c.(*dispatcherManager).deputyDispatchers { d.curTs.Store(1000)