enhance: Alter collection description (#41558)

issue: https://github.com/milvus-io/milvus/issues/41557

Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
groot 2025-05-12 14:16:55 +08:00 committed by GitHub
parent 52950ce392
commit 1574673a8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 88 additions and 80 deletions

View File

@ -90,6 +90,21 @@ func (a *alterCollectionTask) GetLockerKey() LockerKey {
)
}
func getCollectionDescription(props ...*commonpb.KeyValuePair) (bool, string, []*commonpb.KeyValuePair) {
hasDesc := false
desc := ""
newProperties := make([]*commonpb.KeyValuePair, 0, len(props))
for _, p := range props {
if p.GetKey() == common.CollectionDescription {
hasDesc = true
desc = p.GetValue()
} else {
newProperties = append(newProperties, p)
}
}
return hasDesc, desc, newProperties
}
func getConsistencyLevel(props ...*commonpb.KeyValuePair) (bool, commonpb.ConsistencyLevel) {
for _, p := range props {
if p.GetKey() == common.ConsistencyLevel {
@ -122,7 +137,12 @@ func executeAlterCollectionTaskSteps(ctx context.Context,
if ok, level := getConsistencyLevel(newProperties...); ok {
newColl.ConsistencyLevel = level
}
newColl.Properties = newProperties
if ok, desc, props := getCollectionDescription(newProperties...); ok {
newColl.Description = desc
newColl.Properties = props
} else {
newColl.Properties = newProperties
}
tso, err := core.tsoAllocator.GenerateTSO(1)
if err == nil {
newColl.UpdateTimestamp = tso

View File

@ -220,75 +220,6 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
assert.NoError(t, err)
})
t.Run("alter successfully2", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByName",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&model.Collection{
CollectionID: int64(1),
Name: "cn",
DBName: "foo",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
PhysicalChannelNames: []string{"by-dev-rootcoord-dml_1"},
}, nil)
meta.On("AlterCollection",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(nil)
meta.On("ListAliasesByID", mock.Anything, mock.Anything).Return([]string{})
broker := newMockBroker()
broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
return nil
}
packChan := make(chan *msgstream.ConsumeMsgPack, 10)
ticker := newChanTimeTickSync(packChan)
ticker.addDmlChannels("by-dev-rootcoord-dml_1")
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withTtSynchronizer(ticker), withInvalidTsoAllocator())
newPros := append(properties, &commonpb.KeyValuePair{
Key: common.ReplicateEndTSKey,
Value: "10000",
})
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection},
CollectionName: "cn",
Properties: newPros,
},
}
unmarshalFactory := &msgstream.ProtoUDFactory{}
unmarshalDispatcher := unmarshalFactory.NewUnmarshalDispatcher()
err := task.Execute(context.Background())
assert.NoError(t, err)
time.Sleep(time.Second)
select {
case pack := <-packChan:
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].GetType())
tsMsg, err := pack.Msgs[0].Unmarshal(unmarshalDispatcher)
require.NoError(t, err)
replicateMsg := tsMsg.(*msgstream.ReplicateMsg)
assert.Equal(t, "foo", replicateMsg.ReplicateMsg.GetDatabase())
assert.Equal(t, "cn", replicateMsg.ReplicateMsg.GetCollection())
assert.True(t, replicateMsg.ReplicateMsg.GetIsEnd())
default:
assert.Fail(t, "no message sent")
}
})
t.Run("test update collection props", func(t *testing.T) {
coll := &model.Collection{
Properties: []*commonpb.KeyValuePair{
@ -386,7 +317,9 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
assert.Empty(t, coll.Properties)
})
t.Run("alter successfully3", func(t *testing.T) {
testFunc := func(t *testing.T, oldProps []*commonpb.KeyValuePair,
newProps []*commonpb.KeyValuePair, deleteKeys []string,
) chan *msgstream.ConsumeMsgPack {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByName",
mock.Anything,
@ -394,9 +327,11 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(&model.Collection{
CollectionID: int64(1),
Name: "cn",
DBName: "foo",
CollectionID: int64(1),
Name: "cn",
DBName: "foo",
Properties: oldProps,
PhysicalChannelNames: []string{"by-dev-rootcoord-dml_1"},
}, nil)
meta.On("AlterCollection",
mock.Anything,
@ -405,6 +340,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
mock.Anything,
).Return(nil)
meta.On("ListAliasesByID", mock.Anything, mock.Anything).Return([]string{})
broker := newMockBroker()
broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
return nil
@ -412,22 +348,73 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
packChan := make(chan *msgstream.ConsumeMsgPack, 10)
ticker := newChanTimeTickSync(packChan)
ticker.addDmlChannels("by-dev-rootcoord-dml_1")
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withTtSynchronizer(ticker), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection},
CollectionName: "cn",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ConsistencyLevel,
Value: "1",
},
},
Properties: newProps,
DeleteKeys: deleteKeys,
},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
return packChan
}
t.Run("alter successfully2", func(t *testing.T) {
oldProps := []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
}
newProps := append(properties, &commonpb.KeyValuePair{
Key: common.ReplicateEndTSKey,
Value: "10000",
})
packChan := testFunc(t, oldProps, newProps, nil)
unmarshalFactory := &msgstream.ProtoUDFactory{}
unmarshalDispatcher := unmarshalFactory.NewUnmarshalDispatcher()
time.Sleep(time.Second)
select {
case pack := <-packChan:
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].GetType())
tsMsg, err := pack.Msgs[0].Unmarshal(unmarshalDispatcher)
require.NoError(t, err)
replicateMsg := tsMsg.(*msgstream.ReplicateMsg)
assert.Equal(t, "foo", replicateMsg.ReplicateMsg.GetDatabase())
assert.Equal(t, "cn", replicateMsg.ReplicateMsg.GetCollection())
assert.True(t, replicateMsg.ReplicateMsg.GetIsEnd())
default:
assert.Fail(t, "no message sent")
}
})
t.Run("alter successfully3", func(t *testing.T) {
newProps := []*commonpb.KeyValuePair{
{
Key: common.ConsistencyLevel,
Value: "1",
},
}
testFunc(t, nil, newProps, nil)
})
t.Run("alter successfully4", func(t *testing.T) {
newProps := []*commonpb.KeyValuePair{
{
Key: common.CollectionDescription,
Value: "abc",
},
}
testFunc(t, nil, newProps, nil)
})
t.Run("alter successfully5", func(t *testing.T) {
testFunc(t, nil, nil, []string{common.CollectionDescription})
})
}

View File

@ -166,6 +166,7 @@ const (
const (
CollectionTTLConfigKey = "collection.ttl.seconds"
CollectionAutoCompactionKey = "collection.autocompaction.enabled"
CollectionDescription = "collection.description"
// rate limit
CollectionInsertRateMaxKey = "collection.insertRate.max.mb"