From db05d4f976df9cb9a1bf011c964b24f8751a4e29 Mon Sep 17 00:00:00 2001 From: Xianhui Lin <35839735+JsDove@users.noreply.github.com> Date: Wed, 11 Dec 2024 10:20:42 +0800 Subject: [PATCH] enhance: alterindex & altercollection supports altering properties (#37437) enhance : 1. alterindex delete properties We have introduced a new parameter deleteKeys to the alterindex functionality, which allows for the deletion of properties within an index. This enhancement provides users with the flexibility to manage index properties more effectively by removing specific keys as needed. 2. altercollection delete properties We have introduced a new parameter deleteKeys to the altercollection functionality, which allows for the deletion of properties within an collection. This enhancement provides users with the flexibility to manage collection properties more effectively by removing specific keys as needed. 3.support altercollectionfield We currently support modifying the fieldparams of a field in a collection using altercollectionfield, which only allows changes to the max-length attribute. Key Points: - New Parameter - deleteKeys: This new parameter enables the deletion of specified properties from an index. By passing a list of keys to deleteKeys, users can remove the corresponding properties from the index. - Mutual Exclusivity: The deleteKeys parameter cannot be used in conjunction with the extraParams parameter. Users must choose one parameter to pass based on their requirement. If deleteKeys is provided, it indicates an intent to delete properties; if extraParams is provided, it signifies the addition or update of properties. issue: https://github.com/milvus-io/milvus/issues/37436 --------- Signed-off-by: Xianhui.Lin --- go.mod | 2 +- go.sum | 2 + internal/datacoord/index_service.go | 75 ++++++-- internal/datacoord/index_service_test.go | 35 ++++ internal/datacoord/mock_test.go | 4 + internal/distributed/proxy/service.go | 4 + .../distributed/rootcoord/client/client.go | 11 ++ internal/distributed/rootcoord/service.go | 4 + internal/mocks/mock_rootcoord.go | 59 +++++++ internal/mocks/mock_rootcoord_client.go | 74 ++++++++ internal/proto/index_coord.proto | 1 + internal/proto/root_coord.proto | 6 +- internal/proxy/impl.go | 77 ++++++++ internal/proxy/rootcoord_mock_test.go | 4 + internal/proxy/task.go | 125 ++++++++++++- internal/proxy/task_index.go | 20 ++- internal/rootcoord/alter_collection_task.go | 166 +++++++++++++++++- .../rootcoord/alter_collection_task_test.go | 38 ++++ internal/rootcoord/broker.go | 6 +- internal/rootcoord/root_coord.go | 56 +++++- internal/util/mock/grpc_rootcoord_client.go | 4 + pkg/go.mod | 2 +- pkg/go.sum | 4 +- pkg/util/typeutil/schema.go | 9 + 24 files changed, 746 insertions(+), 42 deletions(-) diff --git a/go.mod b/go.mod index 23c5c4b754..23406bef31 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277 github.com/minio/minio-go/v7 v7.0.73 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index 4ce7ad6a3f..f318b36214 100644 --- a/go.sum +++ b/go.sum @@ -632,6 +632,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 h1:cFRrdFZwhFHv33pue1z8beYSvrXDYFSFsCuvXGX3DHE= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277 h1:5/35+F32fs6ifVzI1e+VkUNpK0gWyXQSdZVnmNUFrrg= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 136850c414..ba38b80ca4 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -317,12 +317,33 @@ func UpdateParams(index *model.Index, from []*commonpb.KeyValuePair, updates []* }) } +func DeleteParams(index *model.Index, from []*commonpb.KeyValuePair, deletes []string) []*commonpb.KeyValuePair { + params := make(map[string]string) + for _, param := range from { + params[param.GetKey()] = param.GetValue() + } + + // delete the params + for _, key := range deletes { + delete(params, key) + } + + return lo.MapToSlice(params, func(k string, v string) *commonpb.KeyValuePair { + return &commonpb.KeyValuePair{ + Key: k, + Value: v, + } + }) +} + func (s *Server) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), zap.String("indexName", req.GetIndexName()), ) - log.Info("received AlterIndex request", zap.Any("params", req.GetParams())) + log.Info("received AlterIndex request", + zap.Any("params", req.GetParams()), + zap.Any("deletekeys", req.GetDeleteKeys())) if req.IndexName == "" { return merr.Status(merr.WrapErrParameterInvalidMsg("index name is empty")), nil @@ -339,22 +360,44 @@ func (s *Server) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest) return merr.Status(err), nil } - for _, index := range indexes { - // update user index params - newUserIndexParams := UpdateParams(index, index.UserIndexParams, req.GetParams()) - log.Info("alter index user index params", - zap.String("indexName", index.IndexName), - zap.Any("params", newUserIndexParams), - ) - index.UserIndexParams = newUserIndexParams + if len(req.GetDeleteKeys()) > 0 && len(req.GetParams()) > 0 { + return merr.Status(merr.WrapErrParameterInvalidMsg("cannot provide both DeleteKeys and ExtraParams")), nil + } - // update index params - newIndexParams := UpdateParams(index, index.IndexParams, req.GetParams()) - log.Info("alter index index params", - zap.String("indexName", index.IndexName), - zap.Any("params", newIndexParams), - ) - index.IndexParams = newIndexParams + for _, index := range indexes { + if len(req.GetParams()) > 0 { + // update user index params + newUserIndexParams := UpdateParams(index, index.UserIndexParams, req.GetParams()) + log.Info("alter index user index params", + zap.String("indexName", index.IndexName), + zap.Any("params", newUserIndexParams), + ) + index.UserIndexParams = newUserIndexParams + + // update index params + newIndexParams := UpdateParams(index, index.IndexParams, req.GetParams()) + log.Info("alter index index params", + zap.String("indexName", index.IndexName), + zap.Any("params", newIndexParams), + ) + index.IndexParams = newIndexParams + } else if len(req.GetDeleteKeys()) > 0 { + // delete user index params + newUserIndexParams := DeleteParams(index, index.UserIndexParams, req.GetDeleteKeys()) + log.Info("alter index user deletekeys", + zap.String("indexName", index.IndexName), + zap.Any("params", newUserIndexParams), + ) + index.UserIndexParams = newUserIndexParams + + // delete index params + newIndexParams := DeleteParams(index, index.IndexParams, req.GetDeleteKeys()) + log.Info("alter index index deletekeys", + zap.String("indexName", index.IndexName), + zap.Any("params", newIndexParams), + ) + index.IndexParams = newIndexParams + } if err := ValidateIndexParams(index); err != nil { return merr.Status(err), nil diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index c529f8529f..4923f576da 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -637,6 +637,41 @@ func TestServer_AlterIndex(t *testing.T) { req.Params[0].Value = "true" }) + t.Run("delete_params", func(t *testing.T) { + deleteReq := &indexpb.AlterIndexRequest{ + CollectionID: collID, + IndexName: indexName, + DeleteKeys: []string{common.MmapEnabledKey}, + } + resp, err := s.AlterIndex(ctx, deleteReq) + assert.NoError(t, merr.CheckRPCCall(resp, err)) + + describeResp, err := s.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ + CollectionID: collID, + IndexName: indexName, + Timestamp: createTS, + }) + assert.NoError(t, merr.CheckRPCCall(describeResp, err)) + for _, param := range describeResp.IndexInfos[0].GetUserIndexParams() { + assert.NotEqual(t, common.MmapEnabledKey, param.GetKey()) + } + }) + t.Run("update_and_delete_params", func(t *testing.T) { + updateAndDeleteReq := &indexpb.AlterIndexRequest{ + CollectionID: collID, + IndexName: indexName, + Params: []*commonpb.KeyValuePair{ + { + Key: common.MmapEnabledKey, + Value: "true", + }, + }, + DeleteKeys: []string{common.MmapEnabledKey}, + } + resp, err := s.AlterIndex(ctx, updateAndDeleteReq) + assert.ErrorIs(t, merr.CheckRPCCall(resp, err), merr.ErrParameterInvalid) + }) + t.Run("success", func(t *testing.T) { resp, err := s.AlterIndex(ctx, req) assert.NoError(t, merr.CheckRPCCall(resp, err)) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 06eac6a255..9f64dbb722 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -450,6 +450,10 @@ func (m *mockRootCoordClient) AlterCollection(ctx context.Context, request *milv panic("not implemented") // TODO: Implement } +func (m *mockRootCoordClient) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + func (m *mockRootCoordClient) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { panic("not implemented") // TODO: Implement } diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index fc17566af1..ece75e3cd9 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -696,6 +696,10 @@ func (s *Server) AlterCollection(ctx context.Context, request *milvuspb.AlterCol return s.proxy.AlterCollection(ctx, request) } +func (s *Server) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { + return s.proxy.AlterCollectionField(ctx, request) +} + // CreatePartition notifies Proxy to create a partition func (s *Server) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { return s.proxy.CreatePartition(ctx, request) diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 98da081920..942f366990 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -240,6 +240,17 @@ func (c *Client) AlterCollection(ctx context.Context, request *milvuspb.AlterCol }) } +func (c *Client) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + request = typeutil.Clone(request) + commonpbutil.UpdateMsgBase( + request.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) + return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) { + return client.AlterCollectionField(ctx, request) + }) +} + // CreatePartition create partition func (c *Client) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { in = typeutil.Clone(in) diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index c1e0241a5e..028a9178e6 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -530,6 +530,10 @@ func (s *Server) AlterCollection(ctx context.Context, request *milvuspb.AlterCol return s.rootCoord.AlterCollection(ctx, request) } +func (s *Server) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { + return s.rootCoord.AlterCollectionField(ctx, request) +} + func (s *Server) RenameCollection(ctx context.Context, request *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) { return s.rootCoord.RenameCollection(ctx, request) } diff --git a/internal/mocks/mock_rootcoord.go b/internal/mocks/mock_rootcoord.go index 5dedc5b14a..0ecf1b9ca0 100644 --- a/internal/mocks/mock_rootcoord.go +++ b/internal/mocks/mock_rootcoord.go @@ -272,6 +272,65 @@ func (_c *RootCoord_AlterCollection_Call) RunAndReturn(run func(context.Context, return _c } +// AlterCollectionField provides a mock function with given fields: _a0, _a1 +func (_m *RootCoord) AlterCollectionField(_a0 context.Context, _a1 *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for AlterCollectionField") + } + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionFieldRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionFieldRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RootCoord_AlterCollectionField_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterCollectionField' +type RootCoord_AlterCollectionField_Call struct { + *mock.Call +} + +// AlterCollectionField is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.AlterCollectionFieldRequest +func (_e *RootCoord_Expecter) AlterCollectionField(_a0 interface{}, _a1 interface{}) *RootCoord_AlterCollectionField_Call { + return &RootCoord_AlterCollectionField_Call{Call: _e.mock.On("AlterCollectionField", _a0, _a1)} +} + +func (_c *RootCoord_AlterCollectionField_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.AlterCollectionFieldRequest)) *RootCoord_AlterCollectionField_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionFieldRequest)) + }) + return _c +} + +func (_c *RootCoord_AlterCollectionField_Call) Return(_a0 *commonpb.Status, _a1 error) *RootCoord_AlterCollectionField_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *RootCoord_AlterCollectionField_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error)) *RootCoord_AlterCollectionField_Call { + _c.Call.Return(run) + return _c +} + // AlterDatabase provides a mock function with given fields: _a0, _a1 func (_m *RootCoord) AlterDatabase(_a0 context.Context, _a1 *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_rootcoord_client.go b/internal/mocks/mock_rootcoord_client.go index a04c42ba9e..6542d4ea64 100644 --- a/internal/mocks/mock_rootcoord_client.go +++ b/internal/mocks/mock_rootcoord_client.go @@ -329,6 +329,80 @@ func (_c *MockRootCoordClient_AlterCollection_Call) RunAndReturn(run func(contex return _c } +// AlterCollectionField provides a mock function with given fields: ctx, in, opts +func (_m *MockRootCoordClient) AlterCollectionField(ctx context.Context, in *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for AlterCollectionField") + } + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionFieldRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionFieldRequest, ...grpc.CallOption) *commonpb.Status); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionFieldRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockRootCoordClient_AlterCollectionField_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterCollectionField' +type MockRootCoordClient_AlterCollectionField_Call struct { + *mock.Call +} + +// AlterCollectionField is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.AlterCollectionFieldRequest +// - opts ...grpc.CallOption +func (_e *MockRootCoordClient_Expecter) AlterCollectionField(ctx interface{}, in interface{}, opts ...interface{}) *MockRootCoordClient_AlterCollectionField_Call { + return &MockRootCoordClient_AlterCollectionField_Call{Call: _e.mock.On("AlterCollectionField", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockRootCoordClient_AlterCollectionField_Call) Run(run func(ctx context.Context, in *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption)) *MockRootCoordClient_AlterCollectionField_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionFieldRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockRootCoordClient_AlterCollectionField_Call) Return(_a0 *commonpb.Status, _a1 error) *MockRootCoordClient_AlterCollectionField_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockRootCoordClient_AlterCollectionField_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterCollectionFieldRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockRootCoordClient_AlterCollectionField_Call { + _c.Call.Return(run) + return _c +} + // AlterDatabase provides a mock function with given fields: ctx, in, opts func (_m *MockRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 7377954eba..adcd0aed7b 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -138,6 +138,7 @@ message AlterIndexRequest { int64 collectionID = 1; string index_name = 2; repeated common.KeyValuePair params = 3; + repeated string delete_keys = 4; } message GetIndexInfoRequest { diff --git a/internal/proto/root_coord.proto b/internal/proto/root_coord.proto index ebbc813f71..3efe6fe734 100644 --- a/internal/proto/root_coord.proto +++ b/internal/proto/root_coord.proto @@ -64,7 +64,8 @@ service RootCoord { rpc ShowCollections(milvus.ShowCollectionsRequest) returns (milvus.ShowCollectionsResponse) {} rpc AlterCollection(milvus.AlterCollectionRequest) returns (common.Status) {} - + + rpc AlterCollectionField(milvus.AlterCollectionFieldRequest) returns (common.Status) {} /** * @brief This method is used to create partition * @@ -244,5 +245,4 @@ message CollectionInfoOnPChannel { message PartitionInfoOnPChannel { int64 partition_id = 1; -} - +} \ No newline at end of file diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index c00161cf23..2a0b41166e 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -162,6 +162,17 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String())) case commonpb.MsgType_DropDatabase: globalMetaCache.RemoveDatabase(ctx, request.GetDbName()) + case commonpb.MsgType_AlterCollection, commonpb.MsgType_AlterCollectionField: + if request.CollectionID != UniqueID(0) { + aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, 0, false) + for _, name := range aliasName { + globalMetaCache.DeprecateShardCache(request.GetDbName(), name) + } + } + if collectionName != "" { + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) + } + log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String())) default: log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String())) if request.CollectionID != UniqueID(0) { @@ -1307,6 +1318,72 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC return act.result, nil } +func (node *Proxy) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil + } + + ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterCollectionField") + defer sp.End() + method := "AlterCollectionField" + tr := timerecord.NewTimeRecorder(method) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() + + act := &alterCollectionFieldTask{ + ctx: ctx, + Condition: NewTaskCondition(ctx), + AlterCollectionFieldRequest: request, + rootCoord: node.rootCoord, + queryCoord: node.queryCoord, + dataCoord: node.dataCoord, + } + + log := log.Ctx(ctx).With( + zap.String("role", typeutil.ProxyRole), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("fieldName", request.FieldName), + zap.Any("props", request.Properties)) + + log.Info(rpcReceived(method)) + + if err := node.sched.ddQueue.Enqueue(act); err != nil { + log.Warn( + rpcFailedToEnqueue(method), + zap.Error(err)) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() + return merr.Status(err), nil + } + + log.Debug( + rpcEnqueued(method), + zap.Uint64("BeginTs", act.BeginTs()), + zap.Uint64("EndTs", act.EndTs()), + zap.Uint64("timestamp", request.Base.Timestamp)) + + if err := act.WaitToFinish(); err != nil { + log.Warn( + rpcFailedToWaitToFinish(method), + zap.Error(err), + zap.Uint64("BeginTs", act.BeginTs()), + zap.Uint64("EndTs", act.EndTs())) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() + return merr.Status(err), nil + } + + log.Info( + rpcDone(method), + zap.Uint64("BeginTs", act.BeginTs()), + zap.Uint64("EndTs", act.EndTs())) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + return act.result, nil +} + // CreatePartition create a partition in specific collection. func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(node.GetStateCode()); err != nil { diff --git a/internal/proxy/rootcoord_mock_test.go b/internal/proxy/rootcoord_mock_test.go index 915b679db7..513c941200 100644 --- a/internal/proxy/rootcoord_mock_test.go +++ b/internal/proxy/rootcoord_mock_test.go @@ -1095,6 +1095,10 @@ func (coord *RootCoordMock) AlterCollection(ctx context.Context, request *milvus return &commonpb.Status{}, nil } +func (coord *RootCoordMock) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, nil +} + func (coord *RootCoordMock) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, nil } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 05c881d61c..e0734354b1 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -89,6 +89,7 @@ const ( DescribeAliasTaskName = "DescribeAliasTask" ListAliasesTaskName = "ListAliasesTask" AlterCollectionTaskName = "AlterCollectionTask" + AlterCollectionFieldTaskName = "AlterCollectionFieldTask" UpsertTaskName = "UpsertTask" CreateResourceGroupTaskName = "CreateResourceGroupTask" UpdateResourceGroupsTaskName = "UpdateResourceGroupsTask" @@ -953,6 +954,15 @@ func hasLazyLoadProp(props ...*commonpb.KeyValuePair) bool { return false } +func hasPropInDeletekeys(keys []string) string { + for _, key := range keys { + if key == common.MmapEnabledKey || key == common.LazyLoadEnableKey { + return key + } + } + return "" +} + func validatePartitionKeyIsolation(colName string, isPartitionKeyEnabled bool, props ...*commonpb.KeyValuePair) (bool, error) { iso, err := common.IsPartitionKeyIsolationKvEnabled(props...) if err != nil { @@ -980,19 +990,37 @@ func validatePartitionKeyIsolation(colName string, isPartitionKeyEnabled bool, p } func (t *alterCollectionTask) PreExecute(ctx context.Context) error { + if len(t.GetProperties()) > 0 && len(t.GetDeleteKeys()) > 0 { + return merr.WrapErrParameterInvalidMsg("cannot provide both DeleteKeys and ExtraParams") + } + collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName) if err != nil { return err } t.CollectionID = collectionID - if hasMmapProp(t.Properties...) || hasLazyLoadProp(t.Properties...) { - loaded, err := isCollectionLoaded(ctx, t.queryCoord, t.CollectionID) - if err != nil { - return err + + if len(t.GetProperties()) > 0 { + if hasMmapProp(t.Properties...) || hasLazyLoadProp(t.Properties...) { + loaded, err := isCollectionLoaded(ctx, t.queryCoord, t.CollectionID) + if err != nil { + return err + } + if loaded { + return merr.WrapErrCollectionLoaded(t.CollectionName, "can not alter mmap properties if collection loaded") + } } - if loaded { - return merr.WrapErrCollectionLoaded(t.CollectionName, "can not alter mmap properties if collection loaded") + } else if len(t.GetDeleteKeys()) > 0 { + key := hasPropInDeletekeys(t.DeleteKeys) + if key != "" { + loaded, err := isCollectionLoaded(ctx, t.queryCoord, t.CollectionID) + if err != nil { + return err + } + if loaded { + return merr.WrapErrCollectionLoaded(" %s %s can not delete mmap properties if collection loaded", t.CollectionName, key) + } } } @@ -1065,6 +1093,91 @@ func (t *alterCollectionTask) PostExecute(ctx context.Context) error { return nil } +type alterCollectionFieldTask struct { + baseTask + Condition + *milvuspb.AlterCollectionFieldRequest + ctx context.Context + rootCoord types.RootCoordClient + result *commonpb.Status + queryCoord types.QueryCoordClient + dataCoord types.DataCoordClient +} + +func (t *alterCollectionFieldTask) TraceCtx() context.Context { + return t.ctx +} + +func (t *alterCollectionFieldTask) ID() UniqueID { + return t.Base.MsgID +} + +func (t *alterCollectionFieldTask) SetID(uid UniqueID) { + t.Base.MsgID = uid +} + +func (t *alterCollectionFieldTask) Name() string { + return AlterCollectionTaskName +} + +func (t *alterCollectionFieldTask) Type() commonpb.MsgType { + return t.Base.MsgType +} + +func (t *alterCollectionFieldTask) BeginTs() Timestamp { + return t.Base.Timestamp +} + +func (t *alterCollectionFieldTask) EndTs() Timestamp { + return t.Base.Timestamp +} + +func (t *alterCollectionFieldTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts +} + +func (t *alterCollectionFieldTask) OnEnqueue() error { + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_AlterCollectionField + t.Base.SourceID = paramtable.GetNodeID() + return nil +} + +func (t *alterCollectionFieldTask) PreExecute(ctx context.Context) error { + collSchema, err := globalMetaCache.GetCollectionSchema(ctx, t.GetDbName(), t.CollectionName) + if err != nil { + return err + } + + IsStringType := false + fieldName := "" + var dataType int32 + for _, field := range collSchema.Fields { + if field.GetName() == t.FieldName && (typeutil.IsStringType(field.DataType) || typeutil.IsArrayContainStringElementType(field.DataType, field.ElementType)) { + IsStringType = true + fieldName = field.GetName() + dataType = int32(field.DataType) + } + } + if !IsStringType { + return merr.WrapErrParameterInvalid(fieldName, "%s can not modify the maxlength for non-string types", schemapb.DataType_name[dataType]) + } + + return nil +} + +func (t *alterCollectionFieldTask) Execute(ctx context.Context) error { + var err error + t.result, err = t.rootCoord.AlterCollectionField(ctx, t.AlterCollectionFieldRequest) + return merr.CheckRPCCall(t.result, err) +} + +func (t *alterCollectionFieldTask) PostExecute(ctx context.Context) error { + return nil +} + type createPartitionTask struct { baseTask Condition diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index 914b752807..6fc646e39e 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -612,9 +612,21 @@ func (t *alterIndexTask) OnEnqueue() error { } func (t *alterIndexTask) PreExecute(ctx context.Context) error { - for _, param := range t.req.GetExtraParams() { - if !indexparams.IsConfigableIndexParam(param.GetKey()) { - return merr.WrapErrParameterInvalidMsg("%s is not configable index param", param.GetKey()) + if len(t.req.GetDeleteKeys()) > 0 && len(t.req.GetExtraParams()) > 0 { + return merr.WrapErrParameterInvalidMsg("cannot provide both DeleteKeys and ExtraParams") + } + + if len(t.req.GetExtraParams()) > 0 { + for _, param := range t.req.GetExtraParams() { + if !indexparams.IsConfigableIndexParam(param.GetKey()) { + return merr.WrapErrParameterInvalidMsg("%s is not configable index param in extraParams", param.GetKey()) + } + } + } else if len(t.req.GetDeleteKeys()) > 0 { + for _, param := range t.req.GetDeleteKeys() { + if !indexparams.IsConfigableIndexParam(param) { + return merr.WrapErrParameterInvalidMsg("%s is not configable index param in deleteKeys", param) + } } } @@ -656,6 +668,7 @@ func (t *alterIndexTask) Execute(ctx context.Context) error { zap.String("collection", t.req.GetCollectionName()), zap.String("indexName", t.req.GetIndexName()), zap.Any("params", t.req.GetExtraParams()), + zap.Any("deletekeys", t.req.GetDeleteKeys()), ) log.Info("alter index") @@ -665,6 +678,7 @@ func (t *alterIndexTask) Execute(ctx context.Context) error { CollectionID: t.collectionID, IndexName: t.req.GetIndexName(), Params: t.req.GetExtraParams(), + DeleteKeys: t.req.GetDeleteKeys(), } t.result, err = t.datacoord.AlterIndex(ctx, req) if err = merr.CheckRPCCall(t.result, err); err != nil { diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index dc9c326224..af9f7f4bd5 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/common" @@ -48,8 +49,12 @@ func (a *alterCollectionTask) Prepare(ctx context.Context) error { func (a *alterCollectionTask) Execute(ctx context.Context) error { // Now we only support alter properties of collection - if a.Req.GetProperties() == nil { - return errors.New("only support alter collection properties, but collection properties is empty") + if a.Req.GetProperties() == nil && a.Req.GetDeleteKeys() == nil { + return errors.New("The collection properties to alter and keys to delete must not be empty at the same time") + } + + if len(a.Req.GetProperties()) > 0 && len(a.Req.GetDeleteKeys()) > 0 { + return errors.New("can not provide properties and deletekeys at the same time") } oldColl, err := a.core.meta.GetCollectionByName(ctx, a.Req.GetDbName(), a.Req.GetCollectionName(), a.ts) @@ -59,13 +64,16 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { return err } - if ContainsKeyPairArray(a.Req.GetProperties(), oldColl.Properties) { - log.Info("skip to alter collection due to no changes were detected in the properties", zap.Int64("collectionID", oldColl.CollectionID)) - return nil - } - newColl := oldColl.Clone() - newColl.Properties = MergeProperties(oldColl.Properties, a.Req.GetProperties()) + if len(a.Req.Properties) > 0 { + if ContainsKeyPairArray(a.Req.GetProperties(), oldColl.Properties) { + log.Info("skip to alter collection due to no changes were detected in the properties", zap.Int64("collectionID", oldColl.CollectionID)) + return nil + } + newColl.Properties = MergeProperties(oldColl.Properties, a.Req.GetProperties()) + } else if len(a.Req.DeleteKeys) > 0 { + newColl.Properties = DeleteProperties(oldColl.Properties, a.Req.GetDeleteKeys()) + } ts := a.GetTs() redoTask := newBaseRedoTask(a.core.stepExecutor) @@ -133,3 +141,145 @@ func (a *alterCollectionTask) GetLockerKey() LockerKey { NewCollectionLockerKey(collection, true), ) } + +func DeleteProperties(oldProps []*commonpb.KeyValuePair, deleteKeys []string) []*commonpb.KeyValuePair { + propsMap := make(map[string]string) + for _, prop := range oldProps { + propsMap[prop.Key] = prop.Value + } + for _, key := range deleteKeys { + delete(propsMap, key) + } + propKV := make([]*commonpb.KeyValuePair, 0, len(propsMap)) + for key, value := range propsMap { + propKV = append(propKV, &commonpb.KeyValuePair{Key: key, Value: value}) + } + log.Info("Alter Collection Drop Properties", zap.Any("newProperties", propKV)) + return propKV +} + +type alterCollectionFieldTask struct { + baseTask + Req *milvuspb.AlterCollectionFieldRequest +} + +func (a *alterCollectionFieldTask) Prepare(ctx context.Context) error { + if a.Req.GetCollectionName() == "" { + return fmt.Errorf("alter collection field failed, collection name does not exists") + } + + if a.Req.GetFieldName() == "" { + return fmt.Errorf("alter collection field failed, filed name does not exists") + } + + err := IsValidateUpdatedFieldProps(a.Req.GetProperties()) + if err != nil { + return err + } + + return nil +} + +func (a *alterCollectionFieldTask) Execute(ctx context.Context) error { + if a.Req.GetProperties() == nil { + return errors.New("only support alter collection properties, but collection field properties is empty") + } + + oldColl, err := a.core.meta.GetCollectionByName(ctx, a.Req.GetDbName(), a.Req.GetCollectionName(), a.ts) + if err != nil { + log.Warn("get collection failed during changing collection state", + zap.String("collectionName", a.Req.GetCollectionName()), + zap.String("fieldName", a.Req.GetFieldName()), + zap.Uint64("ts", a.ts)) + return err + } + + newColl := oldColl.Clone() + err = UpdateFieldProperties(newColl, a.Req.GetFieldName(), a.Req.GetProperties()) + if err != nil { + return err + } + ts := a.GetTs() + redoTask := newBaseRedoTask(a.core.stepExecutor) + redoTask.AddSyncStep(&AlterCollectionStep{ + baseStep: baseStep{core: a.core}, + oldColl: oldColl, + newColl: newColl, + ts: ts, + }) + + redoTask.AddSyncStep(&BroadcastAlteredCollectionStep{ + baseStep: baseStep{core: a.core}, + req: &milvuspb.AlterCollectionRequest{ + Base: a.Req.Base, + DbName: a.Req.DbName, + CollectionName: a.Req.CollectionName, + CollectionID: oldColl.CollectionID, + }, + core: a.core, + }) + collectionNames := []string{} + redoTask.AddSyncStep(&expireCacheStep{ + baseStep: baseStep{core: a.core}, + dbName: a.Req.GetDbName(), + collectionNames: append(collectionNames, a.Req.GetCollectionName()), + collectionID: oldColl.CollectionID, + opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollectionField)}, + }) + + return redoTask.Execute(ctx) +} + +var allowedProps = []string{ + common.MaxLengthKey, + common.MmapEnabledKey, +} + +func IsKeyAllowed(key string) bool { + for _, allowedKey := range allowedProps { + if key == allowedKey { + return true + } + } + return false +} + +func IsValidateUpdatedFieldProps(updatedProps []*commonpb.KeyValuePair) error { + for _, prop := range updatedProps { + if !IsKeyAllowed(prop.Key) { + return merr.WrapErrParameterInvalidMsg("%s does not allow update in collection field param", prop.Key) + } + } + return nil +} + +func UpdateFieldProperties(coll *model.Collection, fieldName string, updatedProps []*commonpb.KeyValuePair) error { + for i, field := range coll.Fields { + if field.Name == fieldName { + coll.Fields[i].TypeParams = UpdateFieldPropertyParams(field.TypeParams, updatedProps) + return nil + } + } + return merr.WrapErrParameterInvalidMsg("field %s does not exist in collection", fieldName) +} + +func UpdateFieldPropertyParams(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { + props := make(map[string]string) + for _, prop := range oldProps { + props[prop.Key] = prop.Value + } + log.Info("UpdateFieldPropertyParams", zap.Any("oldprops", props), zap.Any("newprops", updatedProps)) + for _, prop := range updatedProps { + props[prop.Key] = prop.Value + } + log.Info("UpdateFieldPropertyParams", zap.Any("newprops", props)) + propKV := make([]*commonpb.KeyValuePair, 0) + for key, value := range props { + propKV = append(propKV, &commonpb.KeyValuePair{ + Key: key, + Value: value, + }) + } + + return propKV +} diff --git a/internal/rootcoord/alter_collection_task_test.go b/internal/rootcoord/alter_collection_task_test.go index 716f2aa28b..e2b53e958d 100644 --- a/internal/rootcoord/alter_collection_task_test.go +++ b/internal/rootcoord/alter_collection_task_test.go @@ -310,4 +310,42 @@ func Test_alterCollectionTask_Execute(t *testing.T) { Value: "true", }) }) + + t.Run("test delete collection props", func(t *testing.T) { + coll := &model.Collection{ + Properties: []*commonpb.KeyValuePair{ + { + Key: common.CollectionTTLConfigKey, + Value: "1", + }, + { + Key: common.CollectionAutoCompactionKey, + Value: "true", + }, + }, + } + + deleteKeys := []string{common.CollectionTTLConfigKey} + coll.Properties = DeleteProperties(coll.Properties, deleteKeys) + assert.NotContains(t, coll.Properties, &commonpb.KeyValuePair{ + Key: common.CollectionTTLConfigKey, + Value: "1", + }) + + assert.Contains(t, coll.Properties, &commonpb.KeyValuePair{ + Key: common.CollectionAutoCompactionKey, + Value: "true", + }) + + deleteKeys = []string{"nonexistent.key"} + coll.Properties = DeleteProperties(coll.Properties, deleteKeys) + assert.Contains(t, coll.Properties, &commonpb.KeyValuePair{ + Key: common.CollectionAutoCompactionKey, + Value: "true", + }) + + deleteKeys = []string{common.CollectionAutoCompactionKey} + coll.Properties = DeleteProperties(coll.Properties, deleteKeys) + assert.Empty(t, coll.Properties) + }) } diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index c4811b3550..6c6731d7c3 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -225,7 +225,11 @@ func (b *ServerBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID } func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { - log.Info("broadcasting request to alter collection", zap.String("collectionName", req.GetCollectionName()), zap.Int64("collectionID", req.GetCollectionID()), zap.Any("props", req.GetProperties())) + log.Info("broadcasting request to alter collection", + zap.String("collectionName", req.GetCollectionName()), + zap.Int64("collectionID", req.GetCollectionID()), + zap.Any("props", req.GetProperties()), + zap.Any("deleteKeys", req.GetDeleteKeys())) colMeta, err := b.s.meta.GetCollectionByID(ctx, req.GetDbName(), req.GetCollectionID(), typeutil.MaxTimestamp, false) if err != nil { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index ed07f88a8c..be996c5d67 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1356,7 +1356,9 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection log.Ctx(ctx).Info("received request to alter collection", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), - zap.Any("props", in.Properties)) + zap.Any("props", in.Properties), + zap.Any("delete_keys", in.DeleteKeys), + ) t := &alterCollectionTask{ baseTask: newBaseTask(ctx, c), @@ -1395,6 +1397,58 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection return merr.Success(), nil } +func (c *Core) AlterCollectionField(ctx context.Context, in *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { + if err := merr.CheckHealthy(c.GetStateCode()); err != nil { + return merr.Status(err), nil + } + + metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.TotalLabel).Inc() + tr := timerecord.NewTimeRecorder("AlterCollectionField") + + log.Ctx(ctx).Info("received request to alter collection field", + zap.String("role", typeutil.RootCoordRole), + zap.String("name", in.GetCollectionName()), + zap.String("fieldName", in.GetFieldName()), + zap.Any("props", in.Properties), + ) + + t := &alterCollectionFieldTask{ + baseTask: newBaseTask(ctx, c), + Req: in, + } + + if err := c.scheduler.AddTask(t); err != nil { + log.Warn("failed to enqueue request to alter collection field", + zap.String("role", typeutil.RootCoordRole), + zap.Error(err), + zap.String("name", in.GetCollectionName()), + zap.String("fieldName", in.GetFieldName())) + + metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.FailLabel).Inc() + return merr.Status(err), nil + } + + if err := t.WaitToFinish(); err != nil { + log.Warn("failed to alter collection", + zap.String("role", typeutil.RootCoordRole), + zap.Error(err), + zap.String("name", in.GetCollectionName()), + zap.Uint64("ts", t.GetTs())) + + metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.FailLabel).Inc() + return merr.Status(err), nil + } + + metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.SuccessLabel).Inc() + metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollectionField").Observe(float64(tr.ElapseSpan().Milliseconds())) + + log.Info("done to alter collection field", + zap.String("role", typeutil.RootCoordRole), + zap.String("name", in.GetCollectionName()), + zap.String("fieldName", in.GetFieldName())) + return merr.Success(), nil +} + func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil diff --git a/internal/util/mock/grpc_rootcoord_client.go b/internal/util/mock/grpc_rootcoord_client.go index ea75308bb9..abf856d0be 100644 --- a/internal/util/mock/grpc_rootcoord_client.go +++ b/internal/util/mock/grpc_rootcoord_client.go @@ -262,6 +262,10 @@ func (m *GrpcRootCoordClient) AlterCollection(ctx context.Context, in *milvuspb. return &commonpb.Status{}, m.Err } +func (m *GrpcRootCoordClient) AlterCollectionField(ctx context.Context, in *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, m.Err +} + func (m *GrpcRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err } diff --git a/pkg/go.mod b/pkg/go.mod index cdf06aac4d..cb6eb14de4 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -14,7 +14,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.7 - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277 github.com/nats-io/nats-server/v2 v2.10.12 github.com/nats-io/nats.go v1.34.1 github.com/panjf2000/ants/v2 v2.7.2 diff --git a/pkg/go.sum b/pkg/go.sum index 15529f4160..04a38ef236 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -488,8 +488,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 h1:cFRrdFZwhFHv33pue1z8beYSvrXDYFSFsCuvXGX3DHE= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277 h1:5/35+F32fs6ifVzI1e+VkUNpK0gWyXQSdZVnmNUFrrg= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index d69687603f..a3f4a5a9d9 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -560,6 +560,15 @@ func IsStringType(dataType schemapb.DataType) bool { } } +func IsArrayContainStringElementType(dataType schemapb.DataType, elementType schemapb.DataType) bool { + if IsArrayType(dataType) { + if elementType == schemapb.DataType_String || elementType == schemapb.DataType_VarChar { + return true + } + } + return false +} + func IsVariableDataType(dataType schemapb.DataType) bool { return IsStringType(dataType) || IsArrayType(dataType) || IsJSONType(dataType) }