From 16c002b51b38759a4b1b2e448ba0bc177d11f101 Mon Sep 17 00:00:00 2001 From: SimFG Date: Sat, 20 Jul 2024 23:11:39 +0800 Subject: [PATCH] enhance: unified channel name generation and conversion (#34805) /kind improvement - issue: #34804 Signed-off-by: SimFG --- .../datanode/compaction/mix_compactor_test.go | 2 +- internal/rootcoord/create_collection_task.go | 5 +- pkg/util/funcutil/func.go | 10 +++- pkg/util/funcutil/func_test.go | 51 ++++++++++++++++++- pkg/util/metautil/channel.go | 3 +- 5 files changed, 64 insertions(+), 7 deletions(-) diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 4bf0a8efdc..1368618d8a 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -18,7 +18,6 @@ package compaction import ( "context" - "github.com/milvus-io/milvus/internal/allocator" "math" "testing" "time" @@ -30,6 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index d694d90191..5859e8235f 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -358,8 +358,9 @@ func (t *createCollectionTask) assignChannels() error { return fmt.Errorf("no enough channels, want: %d, got: %d", t.Req.GetShardsNum(), len(chanNames)) } - for i := int32(0); i < t.Req.GetShardsNum(); i++ { - vchanNames[i] = fmt.Sprintf("%s_%dv%d", chanNames[i], t.collID, i) + shardNum := int(t.Req.GetShardsNum()) + for i := 0; i < shardNum; i++ { + vchanNames[i] = funcutil.GetVirtualChannel(chanNames[i], t.collID, i) } t.channels = collectionChannels{ virtualChannels: vchanNames, diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 218d4082cc..b928290e4e 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -212,7 +212,11 @@ func GetAvailablePort() int { // IsPhysicalChannel checks if the channel is a physical channel func IsPhysicalChannel(channel string) bool { - return strings.Count(channel, "_") == 1 + i := strings.LastIndex(channel, "_") + if i == -1 { + return true + } + return !strings.Contains(channel[i+1:], "v") } // ToPhysicalChannel get physical channel name from virtual channel name @@ -227,6 +231,10 @@ func ToPhysicalChannel(vchannel string) string { return vchannel[:index] } +func GetVirtualChannel(pchannel string, collectionID int64, idx int) string { + return fmt.Sprintf("%s_%dv%d", pchannel, collectionID, idx) +} + // ConvertChannelName assembles channel name according to parameters. func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (string, error) { if tokenFrom == "" { diff --git a/pkg/util/funcutil/func_test.go b/pkg/util/funcutil/func_test.go index c5c05a11ec..ded4184f0f 100644 --- a/pkg/util/funcutil/func_test.go +++ b/pkg/util/funcutil/func_test.go @@ -191,8 +191,8 @@ func Test_ToPhysicalChannel(t *testing.T) { assert.Equal(t, "abc_", ToPhysicalChannel("abc_")) assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123")) assert.Equal(t, "abc_defgsg", ToPhysicalChannel("abc_defgsg")) - assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123_456")) - assert.Equal(t, "abc__", ToPhysicalChannel("abc___defgsg")) + assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123_456v0")) + assert.Equal(t, "abc___defgsg", ToPhysicalChannel("abc___defgsg")) assert.Equal(t, "abcdef", ToPhysicalChannel("abcdef")) channel := "by-dev-rootcoord-dml_3_449883080965365748v0" for i := 0; i < 10; i++ { @@ -816,3 +816,50 @@ func (s *NumRowsWithSchemaSuite) TestErrorCases() { func TestNumRowsWithSchema(t *testing.T) { suite.Run(t, new(NumRowsWithSchemaSuite)) } + +func TestChannelConvert(t *testing.T) { + t.Run("is physical channel", func(t *testing.T) { + { + channel := "by-dev-replicate-msg" + ok := IsPhysicalChannel(channel) + assert.True(t, ok) + } + + { + channel := "by-dev-rootcoord-dml_2" + ok := IsPhysicalChannel(channel) + assert.True(t, ok) + } + + { + channel := "by-dev-rootcoord-dml_2_1001v0" + ok := IsPhysicalChannel(channel) + assert.False(t, ok) + } + }) + + t.Run("to physical channel", func(t *testing.T) { + { + channel := "by-dev-rootcoord-dml_2_1001v0" + physicalChannel := ToPhysicalChannel(channel) + assert.Equal(t, "by-dev-rootcoord-dml_2", physicalChannel) + } + + { + channel := "by-dev-rootcoord-dml_2" + physicalChannel := ToPhysicalChannel(channel) + assert.Equal(t, "by-dev-rootcoord-dml_2", physicalChannel) + } + + { + channel := "by-dev-replicate-msg" + physicalChannel := ToPhysicalChannel(channel) + assert.Equal(t, "by-dev-replicate-msg", physicalChannel) + } + }) + + t.Run("get virtual channel", func(t *testing.T) { + channel := GetVirtualChannel("by-dev-rootcoord-dml_2", 1001, 0) + assert.Equal(t, "by-dev-rootcoord-dml_2_1001v0", channel) + }) +} diff --git a/pkg/util/metautil/channel.go b/pkg/util/metautil/channel.go index 8edd7be07a..303f8c599a 100644 --- a/pkg/util/metautil/channel.go +++ b/pkg/util/metautil/channel.go @@ -22,6 +22,7 @@ import ( "strconv" "sync" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -100,7 +101,7 @@ func (c Channel) PhysicalName() string { } func (c Channel) VirtualName() string { - return fmt.Sprintf("%s_%dv%d", c.PhysicalName(), c.collectionID, c.shardIdx) + return funcutil.GetVirtualChannel(c.PhysicalName(), c.collectionID, int(c.shardIdx)) } func (c Channel) Equal(ac Channel) bool {