enhance: unified channel name generation and conversion (#34805)

/kind improvement
- issue: #34804

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2024-07-20 23:11:39 +08:00 committed by GitHub
parent b22e549844
commit 16c002b51b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 64 additions and 7 deletions

View File

@ -18,7 +18,6 @@ package compaction
import ( import (
"context" "context"
"github.com/milvus-io/milvus/internal/allocator"
"math" "math"
"testing" "testing"
"time" "time"
@ -30,6 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"

View File

@ -358,8 +358,9 @@ func (t *createCollectionTask) assignChannels() error {
return fmt.Errorf("no enough channels, want: %d, got: %d", t.Req.GetShardsNum(), len(chanNames)) return fmt.Errorf("no enough channels, want: %d, got: %d", t.Req.GetShardsNum(), len(chanNames))
} }
for i := int32(0); i < t.Req.GetShardsNum(); i++ { shardNum := int(t.Req.GetShardsNum())
vchanNames[i] = fmt.Sprintf("%s_%dv%d", chanNames[i], t.collID, i) for i := 0; i < shardNum; i++ {
vchanNames[i] = funcutil.GetVirtualChannel(chanNames[i], t.collID, i)
} }
t.channels = collectionChannels{ t.channels = collectionChannels{
virtualChannels: vchanNames, virtualChannels: vchanNames,

View File

@ -212,7 +212,11 @@ func GetAvailablePort() int {
// IsPhysicalChannel checks if the channel is a physical channel // IsPhysicalChannel checks if the channel is a physical channel
func IsPhysicalChannel(channel string) bool { func IsPhysicalChannel(channel string) bool {
return strings.Count(channel, "_") == 1 i := strings.LastIndex(channel, "_")
if i == -1 {
return true
}
return !strings.Contains(channel[i+1:], "v")
} }
// ToPhysicalChannel get physical channel name from virtual channel name // ToPhysicalChannel get physical channel name from virtual channel name
@ -227,6 +231,10 @@ func ToPhysicalChannel(vchannel string) string {
return vchannel[:index] return vchannel[:index]
} }
func GetVirtualChannel(pchannel string, collectionID int64, idx int) string {
return fmt.Sprintf("%s_%dv%d", pchannel, collectionID, idx)
}
// ConvertChannelName assembles channel name according to parameters. // ConvertChannelName assembles channel name according to parameters.
func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (string, error) { func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (string, error) {
if tokenFrom == "" { if tokenFrom == "" {

View File

@ -191,8 +191,8 @@ func Test_ToPhysicalChannel(t *testing.T) {
assert.Equal(t, "abc_", ToPhysicalChannel("abc_")) assert.Equal(t, "abc_", ToPhysicalChannel("abc_"))
assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123")) assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123"))
assert.Equal(t, "abc_defgsg", ToPhysicalChannel("abc_defgsg")) assert.Equal(t, "abc_defgsg", ToPhysicalChannel("abc_defgsg"))
assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123_456")) assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123_456v0"))
assert.Equal(t, "abc__", ToPhysicalChannel("abc___defgsg")) assert.Equal(t, "abc___defgsg", ToPhysicalChannel("abc___defgsg"))
assert.Equal(t, "abcdef", ToPhysicalChannel("abcdef")) assert.Equal(t, "abcdef", ToPhysicalChannel("abcdef"))
channel := "by-dev-rootcoord-dml_3_449883080965365748v0" channel := "by-dev-rootcoord-dml_3_449883080965365748v0"
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@ -816,3 +816,50 @@ func (s *NumRowsWithSchemaSuite) TestErrorCases() {
func TestNumRowsWithSchema(t *testing.T) { func TestNumRowsWithSchema(t *testing.T) {
suite.Run(t, new(NumRowsWithSchemaSuite)) suite.Run(t, new(NumRowsWithSchemaSuite))
} }
func TestChannelConvert(t *testing.T) {
t.Run("is physical channel", func(t *testing.T) {
{
channel := "by-dev-replicate-msg"
ok := IsPhysicalChannel(channel)
assert.True(t, ok)
}
{
channel := "by-dev-rootcoord-dml_2"
ok := IsPhysicalChannel(channel)
assert.True(t, ok)
}
{
channel := "by-dev-rootcoord-dml_2_1001v0"
ok := IsPhysicalChannel(channel)
assert.False(t, ok)
}
})
t.Run("to physical channel", func(t *testing.T) {
{
channel := "by-dev-rootcoord-dml_2_1001v0"
physicalChannel := ToPhysicalChannel(channel)
assert.Equal(t, "by-dev-rootcoord-dml_2", physicalChannel)
}
{
channel := "by-dev-rootcoord-dml_2"
physicalChannel := ToPhysicalChannel(channel)
assert.Equal(t, "by-dev-rootcoord-dml_2", physicalChannel)
}
{
channel := "by-dev-replicate-msg"
physicalChannel := ToPhysicalChannel(channel)
assert.Equal(t, "by-dev-replicate-msg", physicalChannel)
}
})
t.Run("get virtual channel", func(t *testing.T) {
channel := GetVirtualChannel("by-dev-rootcoord-dml_2", 1001, 0)
assert.Equal(t, "by-dev-rootcoord-dml_2_1001v0", channel)
})
}

View File

@ -22,6 +22,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
) )
@ -100,7 +101,7 @@ func (c Channel) PhysicalName() string {
} }
func (c Channel) VirtualName() string { func (c Channel) VirtualName() string {
return fmt.Sprintf("%s_%dv%d", c.PhysicalName(), c.collectionID, c.shardIdx) return funcutil.GetVirtualChannel(c.PhysicalName(), c.collectionID, int(c.shardIdx))
} }
func (c Channel) Equal(ac Channel) bool { func (c Channel) Equal(ac Channel) bool {