From 9e6da45497c8fdfa99f9c2f1ed21823888dab4e8 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Tue, 26 Dec 2023 16:06:49 +0800 Subject: [PATCH] fix: Use uber atomic instead sync/atomic which only supported after go v1.20 (#29377) relate: https://github.com/milvus-io/milvus/issues/29376 --------- Signed-off-by: aoiasd --- pkg/mq/msgstream/mq_msgstream.go | 7 ++++--- pkg/mq/msgstream/mq_msgstream_test.go | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 09157906b5..386506eb37 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/samber/lo" + uatomic "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -43,8 +44,8 @@ import ( ) var ( - _ MsgStream = (*mqMsgStream)(nil) - streamCount atomic.Int64 + _ MsgStream = (*mqMsgStream)(nil) + streamCounter uatomic.Int64 ) type mqMsgStream struct { @@ -102,7 +103,7 @@ func NewMqMsgStream(ctx context.Context, } ctxLog := log.Ctx(ctx) stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool()) - stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCount.Add(1)), func(event *config.Event) { + stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) { value, err := strconv.ParseBool(event.Value) if err != nil { ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err)) diff --git a/pkg/mq/msgstream/mq_msgstream_test.go b/pkg/mq/msgstream/mq_msgstream_test.go index 2d55265085..46b9797e8e 100644 --- a/pkg/mq/msgstream/mq_msgstream_test.go +++ b/pkg/mq/msgstream/mq_msgstream_test.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" pulsarwrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/pulsar" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -96,6 +97,20 @@ func consumer(ctx context.Context, mq MsgStream) *MsgPack { } } +func TestStream_ConfigEvent(t *testing.T) { + pulsarAddress := getPulsarAddress() + factory := ProtoUDFactory{} + pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) + assert.NoError(t, err) + stream, err := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) + assert.NoError(t, err) + stream.configEvent.OnEvent(&config.Event{Value: "false"}) + stream.configEvent.OnEvent(&config.Event{Value: "????"}) + assert.False(t, stream.isEnabledProduce()) + stream.configEvent.OnEvent(&config.Event{Value: "true"}) + assert.True(t, stream.isEnabledProduce()) +} + func TestStream_PulsarMsgStream_Insert(t *testing.T) { pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)