feat: [2.4] support to deny dll according to database property (#40785)

- issue: #40762
- pr: #40764

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2025-03-23 11:20:16 +08:00 committed by GitHub
parent f2a1847b60
commit 8518f7fe4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 242 additions and 17 deletions

10
go.mod
View File

@ -26,10 +26,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.23
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.24-0.20250319131804-78707a7e3212
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0
github.com/quasilyte/go-ruleguard/dsl v0.3.22
@ -69,7 +66,10 @@ require (
github.com/greatroar/blobloom v0.8.0
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/minio/minio-go/v7 v7.0.61
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/shirou/gopsutil/v4 v4.24.10
github.com/tidwall/gjson v1.17.3
github.com/valyala/fastjson v1.6.4
@ -122,7 +122,6 @@ require (
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/getsentry/sentry-go v0.12.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ini/ini v1.67.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
@ -161,6 +160,7 @@ require (
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect

12
go.sum
View File

@ -292,8 +292,6 @@ github.com/go-fonts/stix v0.1.0/go.mod h1:w/c1f0ldAUlJmLBvlbkvVXLAD+tAMqobIIQpmn
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-kit/kit v0.1.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2CSIqUrmQPqA0gdRIlnLEY0gK5JGjh37zN5U=
@ -612,8 +610,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.23 h1:nTVp61TRKdLoT5BVZF5n8IfhY0h23/kzoGsxm1+odZ8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.23/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.24-0.20250319131804-78707a7e3212 h1:Zi/FbshDaY/hSRIJvTzbnFfvjaf5lYmXMeeJ9+1Z1Wc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.24-0.20250319131804-78707a7e3212/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
@ -626,8 +624,10 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo=
github.com/minio/minio-go/v7 v7.0.73/go.mod h1:qydcVzV8Hqtj1VtEocfxbmVFa2siu6HGa+LDEPogjD8=
github.com/minio/minio-go/v7 v7.0.61 h1:87c+x8J3jxQ5VUGimV9oHdpjsAvy3fhneEBKuoKEVUI=
github.com/minio/minio-go/v7 v7.0.61/go.mod h1:BTu8FcrEw+HidY0zd/0eny43QnVNkXRPXrLXFuQBHXg=
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=

View File

@ -69,6 +69,9 @@ func (m *SimpleLimiter) Check(dbID int64, collectionIDToPartIDs map[int64][]int6
if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() {
return nil
}
if n <= 0 {
return nil
}
m.quotaStatesMu.RLock()
defer m.quotaStatesMu.RUnlock()

View File

@ -398,7 +398,7 @@ func TestRateLimiter(t *testing.T) {
err := simpleLimiter.Check(-1, nil, internalpb.RateType_DDLDB, 1)
assert.NoError(t, err)
err = simpleLimiter.Check(-1, nil, internalpb.RateType_DDLDB, 1)
err = simpleLimiter.Check(-1, nil, internalpb.RateType_DDLDB, 0)
assert.NoError(t, err)
})
}

View File

@ -112,6 +112,7 @@ var dqlRateTypes = typeutil.NewSet(
type LimiterRange struct {
RateScope internalpb.RateScope
OpType opType
IncludeRateTypes typeutil.Set[internalpb.RateType]
ExcludeRateTypes typeutil.Set[internalpb.RateType]
}
@ -235,8 +236,14 @@ func updateLimiter(node *rlinternal.RateLimiterNode, limiter *ratelimitutil.Limi
return
}
limiters := node.GetLimiters()
getRateTypes(limiterRange.RateScope, limiterRange.OpType).
Complement(limiterRange.ExcludeRateTypes).Range(func(rt internalpb.RateType) bool {
rateTypes := getRateTypes(limiterRange.RateScope, limiterRange.OpType)
if limiterRange.IncludeRateTypes.Len() > 0 {
rateTypes = rateTypes.Intersection(limiterRange.IncludeRateTypes)
}
if limiterRange.ExcludeRateTypes.Len() > 0 {
rateTypes = rateTypes.Complement(limiterRange.ExcludeRateTypes)
}
rateTypes.Range(func(rt internalpb.RateType) bool {
originLimiter, ok := limiters.Get(rt)
if !ok {
log.Warn("update limiter failed, limiter not found",
@ -557,6 +564,54 @@ func (q *QuotaCenter) collectMetrics() error {
return nil
}
func getDbPropertyWithAction(db *model.Database, property string, actionFunc func(bool)) {
if db == nil || property == "" || actionFunc == nil {
return
}
if v := db.GetProperty(property); v != "" {
if dbForceDenyDDLEnabled, err := strconv.ParseBool(v); err == nil {
actionFunc(dbForceDenyDDLEnabled)
} else {
log.Warn("invalid configuration for database force deny DDL",
zap.String("config item", property),
zap.String("config value", v))
}
}
}
func (q *QuotaCenter) calculateDBDDLRates() {
dbs, err := q.meta.ListDatabases(q.ctx, typeutil.MaxTimestamp)
if err != nil {
log.Warn("get databases failed", zap.Error(err))
return
}
for _, db := range dbs {
dbDDLKeysWithRatesType := map[string]typeutil.Set[internalpb.RateType]{
common.DatabaseForceDenyDDLKey: ddlRateTypes,
common.DatabaseForceDenyCollectionDDLKey: typeutil.NewSet(internalpb.RateType_DDLCollection),
common.DatabaseForceDenyPartitionDDLKey: typeutil.NewSet(internalpb.RateType_DDLPartition),
common.DatabaseForceDenyIndexDDLKey: typeutil.NewSet(internalpb.RateType_DDLIndex),
common.DatabaseForceDenyFlushDDLKey: typeutil.NewSet(internalpb.RateType_DDLFlush),
common.DatabaseForceDenyCompactionDDLKey: typeutil.NewSet(internalpb.RateType_DDLCompaction),
}
for ddlKey, rateTypes := range dbDDLKeysWithRatesType {
getDbPropertyWithAction(db, ddlKey, func(enabled bool) {
if enabled {
dbLimiters := q.rateLimiter.GetOrCreateDatabaseLimiters(db.ID,
newParamLimiterFunc(internalpb.RateScope_Database, allOps))
updateLimiter(dbLimiters, GetEarliestLimiter(), &LimiterRange{
RateScope: internalpb.RateScope_Database,
OpType: ddl,
IncludeRateTypes: rateTypes,
})
dbLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToDDL, commonpb.ErrorCode_ForceDeny)
}
})
}
}
}
// forceDenyWriting sets dml rates to 0 to reject all dml requests.
func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster bool, dbIDs, collectionIDs []int64, col2partitionIDs map[int64][]int64) error {
log := log.Ctx(context.TODO()).WithRateGroup("quotaCenter.forceDenyWriting", 1.0, 60.0)
@ -1164,6 +1219,8 @@ func (q *QuotaCenter) calculateRates() error {
return err
}
q.calculateDBDDLRates()
// log.Debug("QuotaCenter calculates rate done", zap.Any("rates", q.currentRates))
return nil
}

View File

@ -462,6 +462,7 @@ func TestQuotaCenter(t *testing.T) {
qc := mocks.NewMockQueryCoordClient(t)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return([]*model.Database{}, nil).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
quotaCenter.clearMetrics()
err = quotaCenter.calculateRates()
@ -1798,3 +1799,146 @@ func TestTORequestLimiter(t *testing.T) {
assert.Equal(t, 1, len(proxyLimit.Codes))
assert.Equal(t, commonpb.ErrorCode_ForceDeny, proxyLimit.Codes[0])
}
func TestDatabaseForceDenyDDL(t *testing.T) {
getQuotaCenter := func() (*QuotaCenter, *mockrootcoord.IMetaTable) {
ctx := context.Background()
qc := mocks.NewMockQueryCoordClient(t)
meta := mockrootcoord.NewIMetaTable(t)
pcm := proxyutil.NewMockProxyClientManager(t)
dc := mocks.NewMockDataCoordClient(t)
core, _ := NewCore(ctx, nil)
core.tsoAllocator = newMockTsoAllocator()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
return quotaCenter, meta
}
t.Run("fail to list database", func(t *testing.T) {
quotaCenter, meta := getQuotaCenter()
meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once()
quotaCenter.calculateDBDDLRates()
})
t.Run("force deny ddl for database", func(t *testing.T) {
quotaCenter, meta := getQuotaCenter()
meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return([]*model.Database{
{
ID: 1, Name: "db1", Properties: []*commonpb.KeyValuePair{
{
Key: common.DatabaseForceDenyDDLKey,
Value: "true",
},
},
},
{
ID: 2, Name: "db2", Properties: []*commonpb.KeyValuePair{
{
Key: "aaa",
Value: "true",
},
},
},
{
ID: 3, Name: "db3", Properties: []*commonpb.KeyValuePair{
{
Key: common.DatabaseForceDenyDDLKey,
Value: "100",
},
},
},
}, nil).Once()
quotaCenter.calculateDBDDLRates()
limiters := quotaCenter.rateLimiter.GetDatabaseLimiters(1)
assert.Equal(t, 1, limiters.GetQuotaStates().Len())
assert.True(t, limiters.GetQuotaStates().Contain(milvuspb.QuotaState_DenyToDDL))
{
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCollection)
assert.Equal(t, true, ok)
assert.EqualValues(t, 0.0, limiter.Limit())
}
{
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLPartition)
assert.Equal(t, true, ok)
assert.EqualValues(t, 0.0, limiter.Limit())
}
{
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLIndex)
assert.Equal(t, true, ok)
assert.EqualValues(t, 0.0, limiter.Limit())
}
{
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCompaction)
assert.Equal(t, true, ok)
assert.EqualValues(t, 0.0, limiter.Limit())
}
{
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLFlush)
assert.Equal(t, true, ok)
assert.EqualValues(t, 0.0, limiter.Limit())
}
})
t.Run("force deny detail ddl for database", func(t *testing.T) {
quotaCenter, meta := getQuotaCenter()
meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return([]*model.Database{
{
ID: 1, Name: "foo123", Properties: []*commonpb.KeyValuePair{
{
Key: common.DatabaseForceDenyCollectionDDLKey,
Value: "true",
},
{
Key: common.DatabaseForceDenyPartitionDDLKey,
Value: "true",
},
{
Key: common.DatabaseForceDenyFlushDDLKey,
Value: "true",
},
{
Key: common.DatabaseForceDenyCompactionDDLKey,
Value: "true",
},
{
Key: common.DatabaseForceDenyIndexDDLKey,
Value: "true",
},
},
},
}, nil).Once()
quotaCenter.calculateDBDDLRates()
limiters := quotaCenter.rateLimiter.GetDatabaseLimiters(1)
assert.Equal(t, 1, limiters.GetQuotaStates().Len())
assert.True(t, limiters.GetQuotaStates().Contain(milvuspb.QuotaState_DenyToDDL))
{
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCollection)
assert.Equal(t, true, ok)
assert.EqualValues(t, 0.0, limiter.Limit())
}
{
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLPartition)
assert.Equal(t, true, ok)
assert.EqualValues(t, 0.0, limiter.Limit())
}
{
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLIndex)
assert.Equal(t, true, ok)
assert.EqualValues(t, 0.0, limiter.Limit())
}
{
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCompaction)
assert.Equal(t, true, ok)
assert.EqualValues(t, 0.0, limiter.Limit())
}
{
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLFlush)
assert.Equal(t, true, ok)
assert.EqualValues(t, 0.0, limiter.Limit())
}
})
}

View File

@ -97,6 +97,11 @@ func (rln *RateLimiterNode) GetQuotaExceededError(rt internalpb.RateType) error
if errCode, ok := rln.quotaStates.Get(milvuspb.QuotaState_DenyToRead); ok {
return merr.WrapErrServiceQuotaExceeded(ratelimitutil.GetQuotaErrorString(errCode))
}
case internalpb.RateType_DDLCollection, internalpb.RateType_DDLPartition,
internalpb.RateType_DDLIndex, internalpb.RateType_DDLCompaction, internalpb.RateType_DDLFlush:
if errCode, ok := rln.quotaStates.Get(milvuspb.QuotaState_DenyToDDL); ok {
return merr.WrapErrServiceQuotaExceeded(ratelimitutil.GetQuotaErrorString(errCode))
}
}
return merr.WrapErrServiceQuotaExceeded(fmt.Sprintf("rate type: %s", rt.String()))
}

View File

@ -143,6 +143,15 @@ func TestRateLimiterNodeGetQuotaExceededError(t *testing.T) {
assert.True(t, strings.Contains(err.Error(), "disabled"))
})
t.Run("ddl", func(t *testing.T) {
limitNode := NewRateLimiterNode(internalpb.RateScope_Database)
limitNode.quotaStates.Insert(milvuspb.QuotaState_DenyToDDL, commonpb.ErrorCode_ForceDeny)
err := limitNode.GetQuotaExceededError(internalpb.RateType_DDLCollection)
assert.True(t, errors.Is(err, merr.ErrServiceQuotaExceeded))
// reference: ratelimitutil.GetQuotaErrorString(errCode)
assert.True(t, strings.Contains(err.Error(), "disabled"))
})
t.Run("unknown", func(t *testing.T) {
limitNode := NewRateLimiterNode(internalpb.RateScope_Cluster)
err := limitNode.GetQuotaExceededError(internalpb.RateType_DDLCompaction)

View File

@ -164,6 +164,13 @@ const (
DatabaseForceDenyWritingKey = "database.force.deny.writing"
DatabaseForceDenyReadingKey = "database.force.deny.reading"
DatabaseForceDenyDDLKey = "database.force.deny.ddl" // all ddl
DatabaseForceDenyCollectionDDLKey = "database.force.deny.collectionDDL"
DatabaseForceDenyPartitionDDLKey = "database.force.deny.partitionDDL"
DatabaseForceDenyIndexDDLKey = "database.force.deny.index"
DatabaseForceDenyFlushDDLKey = "database.force.deny.flush"
DatabaseForceDenyCompactionDDLKey = "database.force.deny.compaction"
// collection level load properties
CollectionReplicaNumber = "collection.replica.number"
CollectionResourceGroups = "collection.resource_groups"

View File

@ -12,7 +12,7 @@ require (
github.com/expr-lang/expr v1.15.7
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.23
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.24-0.20250319131804-78707a7e3212
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2

View File

@ -503,8 +503,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.23 h1:nTVp61TRKdLoT5BVZF5n8IfhY0h23/kzoGsxm1+odZ8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.23/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.24-0.20250319131804-78707a7e3212 h1:Zi/FbshDaY/hSRIJvTzbnFfvjaf5lYmXMeeJ9+1Z1Wc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.24-0.20250319131804-78707a7e3212/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=