From 65ce1f97e7456a63c02b6582c433942f9eb9e20f Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Mon, 7 Dec 2020 19:41:39 +0800 Subject: [PATCH] Fix bug: pulsar dirty msg Signed-off-by: zhenshan.cao --- internal/master/global_allocator_test.go | 11 --------- internal/master/master_test.go | 31 ++++++++++++++++++++++++ internal/proxy/proxy_test.go | 19 +++++++++++++++ 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/internal/master/global_allocator_test.go b/internal/master/global_allocator_test.go index f33b934dc2..35abae57ec 100644 --- a/internal/master/global_allocator_test.go +++ b/internal/master/global_allocator_test.go @@ -1,7 +1,6 @@ package master import ( - "os" "testing" "time" @@ -12,16 +11,6 @@ import ( var gTestTsoAllocator Allocator var gTestIDAllocator *GlobalIDAllocator -func TestMain(m *testing.M) { - Params.Init() - - etcdAddr := Params.EtcdAddress - gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso")) - gTestIDAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid")) - exitCode := m.Run() - os.Exit(exitCode) -} - func TestGlobalTSOAllocator_Initialize(t *testing.T) { err := gTestTsoAllocator.Initialize() assert.Nil(t, err) diff --git a/internal/master/master_test.go b/internal/master/master_test.go index 47f85f51eb..5af957c527 100644 --- a/internal/master/master_test.go +++ b/internal/master/master_test.go @@ -4,9 +4,12 @@ import ( "context" "log" "math/rand" + "os" "strconv" "testing" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -39,6 +42,23 @@ func makeMasterAddress(port int64) string { return masterAddr } +func makeNewChannalNames(names []string, suffix string) []string { + var ret []string + for _, name := range names { + ret = append(ret, name+suffix) + } + return ret +} + +func refreshChannelNames() { + suffix := "_test" + strconv.FormatInt(rand.Int63n(100), 10) + Params.DDChannelNames = makeNewChannalNames(Params.DDChannelNames, suffix) + Params.WriteNodeTimeTickChannelNames = makeNewChannalNames(Params.WriteNodeTimeTickChannelNames, suffix) + Params.InsertChannelNames = makeNewChannalNames(Params.InsertChannelNames, suffix) + Params.K2SChannelNames = makeNewChannalNames(Params.K2SChannelNames, suffix) + Params.ProxyTimeTickChannelNames = makeNewChannalNames(Params.ProxyTimeTickChannelNames, suffix) +} + func receiveTimeTickMsg(stream *ms.MsgStream) bool { for { result := (*stream).Consume() @@ -56,6 +76,17 @@ func getTimeTickMsgPack(ttmsgs [][2]uint64) *ms.MsgPack { return &msgPack } +func TestMain(m *testing.M) { + Init() + refreshMasterAddress() + refreshChannelNames() + etcdAddr := Params.EtcdAddress + gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso")) + gTestIDAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid")) + exitCode := m.Run() + os.Exit(exitCode) +} + func TestMaster(t *testing.T) { Init() refreshMasterAddress() diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 4eb56f3086..677b4e7413 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "math/rand" "os" "strconv" "strings" @@ -34,8 +35,26 @@ var masterServer *master.Master var testNum = 10 +func makeNewChannalNames(names []string, suffix string) []string { + var ret []string + for _, name := range names { + ret = append(ret, name+suffix) + } + return ret +} + +func refreshChannelNames() { + suffix := "_test" + strconv.FormatInt(rand.Int63n(100), 10) + master.Params.DDChannelNames = makeNewChannalNames(master.Params.DDChannelNames, suffix) + master.Params.WriteNodeTimeTickChannelNames = makeNewChannalNames(master.Params.WriteNodeTimeTickChannelNames, suffix) + master.Params.InsertChannelNames = makeNewChannalNames(master.Params.InsertChannelNames, suffix) + master.Params.K2SChannelNames = makeNewChannalNames(master.Params.K2SChannelNames, suffix) + master.Params.ProxyTimeTickChannelNames = makeNewChannalNames(master.Params.ProxyTimeTickChannelNames, suffix) +} + func startMaster(ctx context.Context) { master.Init() + refreshChannelNames() etcdAddr := master.Params.EtcdAddress metaRootPath := master.Params.MetaRootPath