diff --git a/go.mod b/go.mod index 23c5c4b754..23406bef31 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277 github.com/minio/minio-go/v7 v7.0.73 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index 4ce7ad6a3f..f318b36214 100644 --- a/go.sum +++ b/go.sum @@ -632,6 +632,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 h1:cFRrdFZwhFHv33pue1z8beYSvrXDYFSFsCuvXGX3DHE= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277 h1:5/35+F32fs6ifVzI1e+VkUNpK0gWyXQSdZVnmNUFrrg= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 136850c414..ba38b80ca4 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -317,12 +317,33 @@ func UpdateParams(index *model.Index, from []*commonpb.KeyValuePair, updates []* }) } +func DeleteParams(index *model.Index, from []*commonpb.KeyValuePair, deletes []string) []*commonpb.KeyValuePair { + params := make(map[string]string) + for _, param := range from { + params[param.GetKey()] = param.GetValue() + } + + // delete the params + for _, key := range deletes { + delete(params, key) + } + + return lo.MapToSlice(params, func(k string, v string) *commonpb.KeyValuePair { + return &commonpb.KeyValuePair{ + Key: k, + Value: v, + } + }) +} + func (s *Server) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), zap.String("indexName", req.GetIndexName()), ) - log.Info("received AlterIndex request", zap.Any("params", req.GetParams())) + log.Info("received AlterIndex request", + zap.Any("params", req.GetParams()), + zap.Any("deletekeys", req.GetDeleteKeys())) if req.IndexName == "" { return merr.Status(merr.WrapErrParameterInvalidMsg("index name is empty")), nil @@ -339,22 +360,44 @@ func (s *Server) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest) return merr.Status(err), nil } - for _, index := range indexes { - // update user index params - newUserIndexParams := UpdateParams(index, index.UserIndexParams, req.GetParams()) - log.Info("alter index user index params", - zap.String("indexName", index.IndexName), - zap.Any("params", newUserIndexParams), - ) - index.UserIndexParams = newUserIndexParams + if len(req.GetDeleteKeys()) > 0 && len(req.GetParams()) > 0 { + return merr.Status(merr.WrapErrParameterInvalidMsg("cannot provide both DeleteKeys and ExtraParams")), nil + } - // update index params - newIndexParams := UpdateParams(index, index.IndexParams, req.GetParams()) - log.Info("alter index index params", - zap.String("indexName", index.IndexName), - zap.Any("params", newIndexParams), - ) - index.IndexParams = newIndexParams + for _, index := range indexes { + if len(req.GetParams()) > 0 { + // update user index params + newUserIndexParams := UpdateParams(index, index.UserIndexParams, req.GetParams()) + log.Info("alter index user index params", + zap.String("indexName", index.IndexName), + zap.Any("params", newUserIndexParams), + ) + index.UserIndexParams = newUserIndexParams + + // update index params + newIndexParams := UpdateParams(index, index.IndexParams, req.GetParams()) + log.Info("alter index index params", + zap.String("indexName", index.IndexName), + zap.Any("params", newIndexParams), + ) + index.IndexParams = newIndexParams + } else if len(req.GetDeleteKeys()) > 0 { + // delete user index params + newUserIndexParams := DeleteParams(index, index.UserIndexParams, req.GetDeleteKeys()) + log.Info("alter index user deletekeys", + zap.String("indexName", index.IndexName), + zap.Any("params", newUserIndexParams), + ) + index.UserIndexParams = newUserIndexParams + + // delete index params + newIndexParams := DeleteParams(index, index.IndexParams, req.GetDeleteKeys()) + log.Info("alter index index deletekeys", + zap.String("indexName", index.IndexName), + zap.Any("params", newIndexParams), + ) + index.IndexParams = newIndexParams + } if err := ValidateIndexParams(index); err != nil { return merr.Status(err), nil diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index c529f8529f..4923f576da 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -637,6 +637,41 @@ func TestServer_AlterIndex(t *testing.T) { req.Params[0].Value = "true" }) + t.Run("delete_params", func(t *testing.T) { + deleteReq := &indexpb.AlterIndexRequest{ + CollectionID: collID, + IndexName: indexName, + DeleteKeys: []string{common.MmapEnabledKey}, + } + resp, err := s.AlterIndex(ctx, deleteReq) + assert.NoError(t, merr.CheckRPCCall(resp, err)) + + describeResp, err := s.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ + CollectionID: collID, + IndexName: indexName, + Timestamp: createTS, + }) + assert.NoError(t, merr.CheckRPCCall(describeResp, err)) + for _, param := range describeResp.IndexInfos[0].GetUserIndexParams() { + assert.NotEqual(t, common.MmapEnabledKey, param.GetKey()) + } + }) + t.Run("update_and_delete_params", func(t *testing.T) { + updateAndDeleteReq := &indexpb.AlterIndexRequest{ + CollectionID: collID, + IndexName: indexName, + Params: []*commonpb.KeyValuePair{ + { + Key: common.MmapEnabledKey, + Value: "true", + }, + }, + DeleteKeys: []string{common.MmapEnabledKey}, + } + resp, err := s.AlterIndex(ctx, updateAndDeleteReq) + assert.ErrorIs(t, merr.CheckRPCCall(resp, err), merr.ErrParameterInvalid) + }) + t.Run("success", func(t *testing.T) { resp, err := s.AlterIndex(ctx, req) assert.NoError(t, merr.CheckRPCCall(resp, err)) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 06eac6a255..9f64dbb722 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -450,6 +450,10 @@ func (m *mockRootCoordClient) AlterCollection(ctx context.Context, request *milv panic("not implemented") // TODO: Implement } +func (m *mockRootCoordClient) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + func (m *mockRootCoordClient) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { panic("not implemented") // TODO: Implement } diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index fc17566af1..ece75e3cd9 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -696,6 +696,10 @@ func (s *Server) AlterCollection(ctx context.Context, request *milvuspb.AlterCol return s.proxy.AlterCollection(ctx, request) } +func (s *Server) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { + return s.proxy.AlterCollectionField(ctx, request) +} + // CreatePartition notifies Proxy to create a partition func (s *Server) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { return s.proxy.CreatePartition(ctx, request) diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 98da081920..942f366990 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -240,6 +240,17 @@ func (c *Client) AlterCollection(ctx context.Context, request *milvuspb.AlterCol }) } +func (c *Client) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + request = typeutil.Clone(request) + commonpbutil.UpdateMsgBase( + request.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) + return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) { + return client.AlterCollectionField(ctx, request) + }) +} + // CreatePartition create partition func (c *Client) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { in = typeutil.Clone(in) diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index c1e0241a5e..028a9178e6 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -530,6 +530,10 @@ func (s *Server) AlterCollection(ctx context.Context, request *milvuspb.AlterCol return s.rootCoord.AlterCollection(ctx, request) } +func (s *Server) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { + return s.rootCoord.AlterCollectionField(ctx, request) +} + func (s *Server) RenameCollection(ctx context.Context, request *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) { return s.rootCoord.RenameCollection(ctx, request) } diff --git a/internal/mocks/mock_rootcoord.go b/internal/mocks/mock_rootcoord.go index 5dedc5b14a..0ecf1b9ca0 100644 --- a/internal/mocks/mock_rootcoord.go +++ b/internal/mocks/mock_rootcoord.go @@ -272,6 +272,65 @@ func (_c *RootCoord_AlterCollection_Call) RunAndReturn(run func(context.Context, return _c } +// AlterCollectionField provides a mock function with given fields: _a0, _a1 +func (_m *RootCoord) AlterCollectionField(_a0 context.Context, _a1 *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for AlterCollectionField") + } + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionFieldRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionFieldRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RootCoord_AlterCollectionField_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterCollectionField' +type RootCoord_AlterCollectionField_Call struct { + *mock.Call +} + +// AlterCollectionField is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.AlterCollectionFieldRequest +func (_e *RootCoord_Expecter) AlterCollectionField(_a0 interface{}, _a1 interface{}) *RootCoord_AlterCollectionField_Call { + return &RootCoord_AlterCollectionField_Call{Call: _e.mock.On("AlterCollectionField", _a0, _a1)} +} + +func (_c *RootCoord_AlterCollectionField_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.AlterCollectionFieldRequest)) *RootCoord_AlterCollectionField_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionFieldRequest)) + }) + return _c +} + +func (_c *RootCoord_AlterCollectionField_Call) Return(_a0 *commonpb.Status, _a1 error) *RootCoord_AlterCollectionField_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *RootCoord_AlterCollectionField_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error)) *RootCoord_AlterCollectionField_Call { + _c.Call.Return(run) + return _c +} + // AlterDatabase provides a mock function with given fields: _a0, _a1 func (_m *RootCoord) AlterDatabase(_a0 context.Context, _a1 *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_rootcoord_client.go b/internal/mocks/mock_rootcoord_client.go index a04c42ba9e..6542d4ea64 100644 --- a/internal/mocks/mock_rootcoord_client.go +++ b/internal/mocks/mock_rootcoord_client.go @@ -329,6 +329,80 @@ func (_c *MockRootCoordClient_AlterCollection_Call) RunAndReturn(run func(contex return _c } +// AlterCollectionField provides a mock function with given fields: ctx, in, opts +func (_m *MockRootCoordClient) AlterCollectionField(ctx context.Context, in *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for AlterCollectionField") + } + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionFieldRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionFieldRequest, ...grpc.CallOption) *commonpb.Status); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionFieldRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockRootCoordClient_AlterCollectionField_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterCollectionField' +type MockRootCoordClient_AlterCollectionField_Call struct { + *mock.Call +} + +// AlterCollectionField is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.AlterCollectionFieldRequest +// - opts ...grpc.CallOption +func (_e *MockRootCoordClient_Expecter) AlterCollectionField(ctx interface{}, in interface{}, opts ...interface{}) *MockRootCoordClient_AlterCollectionField_Call { + return &MockRootCoordClient_AlterCollectionField_Call{Call: _e.mock.On("AlterCollectionField", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockRootCoordClient_AlterCollectionField_Call) Run(run func(ctx context.Context, in *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption)) *MockRootCoordClient_AlterCollectionField_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionFieldRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockRootCoordClient_AlterCollectionField_Call) Return(_a0 *commonpb.Status, _a1 error) *MockRootCoordClient_AlterCollectionField_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockRootCoordClient_AlterCollectionField_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterCollectionFieldRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockRootCoordClient_AlterCollectionField_Call { + _c.Call.Return(run) + return _c +} + // AlterDatabase provides a mock function with given fields: ctx, in, opts func (_m *MockRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 7377954eba..adcd0aed7b 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -138,6 +138,7 @@ message AlterIndexRequest { int64 collectionID = 1; string index_name = 2; repeated common.KeyValuePair params = 3; + repeated string delete_keys = 4; } message GetIndexInfoRequest { diff --git a/internal/proto/root_coord.proto b/internal/proto/root_coord.proto index ebbc813f71..3efe6fe734 100644 --- a/internal/proto/root_coord.proto +++ b/internal/proto/root_coord.proto @@ -64,7 +64,8 @@ service RootCoord { rpc ShowCollections(milvus.ShowCollectionsRequest) returns (milvus.ShowCollectionsResponse) {} rpc AlterCollection(milvus.AlterCollectionRequest) returns (common.Status) {} - + + rpc AlterCollectionField(milvus.AlterCollectionFieldRequest) returns (common.Status) {} /** * @brief This method is used to create partition * @@ -244,5 +245,4 @@ message CollectionInfoOnPChannel { message PartitionInfoOnPChannel { int64 partition_id = 1; -} - +} \ No newline at end of file diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index c00161cf23..2a0b41166e 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -162,6 +162,17 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String())) case commonpb.MsgType_DropDatabase: globalMetaCache.RemoveDatabase(ctx, request.GetDbName()) + case commonpb.MsgType_AlterCollection, commonpb.MsgType_AlterCollectionField: + if request.CollectionID != UniqueID(0) { + aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, 0, false) + for _, name := range aliasName { + globalMetaCache.DeprecateShardCache(request.GetDbName(), name) + } + } + if collectionName != "" { + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) + } + log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String())) default: log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String())) if request.CollectionID != UniqueID(0) { @@ -1307,6 +1318,72 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC return act.result, nil } +func (node *Proxy) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil + } + + ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterCollectionField") + defer sp.End() + method := "AlterCollectionField" + tr := timerecord.NewTimeRecorder(method) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() + + act := &alterCollectionFieldTask{ + ctx: ctx, + Condition: NewTaskCondition(ctx), + AlterCollectionFieldRequest: request, + rootCoord: node.rootCoord, + queryCoord: node.queryCoord, + dataCoord: node.dataCoord, + } + + log := log.Ctx(ctx).With( + zap.String("role", typeutil.ProxyRole), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("fieldName", request.FieldName), + zap.Any("props", request.Properties)) + + log.Info(rpcReceived(method)) + + if err := node.sched.ddQueue.Enqueue(act); err != nil { + log.Warn( + rpcFailedToEnqueue(method), + zap.Error(err)) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() + return merr.Status(err), nil + } + + log.Debug( + rpcEnqueued(method), + zap.Uint64("BeginTs", act.BeginTs()), + zap.Uint64("EndTs", act.EndTs()), + zap.Uint64("timestamp", request.Base.Timestamp)) + + if err := act.WaitToFinish(); err != nil { + log.Warn( + rpcFailedToWaitToFinish(method), + zap.Error(err), + zap.Uint64("BeginTs", act.BeginTs()), + zap.Uint64("EndTs", act.EndTs())) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() + return merr.Status(err), nil + } + + log.Info( + rpcDone(method), + zap.Uint64("BeginTs", act.BeginTs()), + zap.Uint64("EndTs", act.EndTs())) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + return act.result, nil +} + // CreatePartition create a partition in specific collection. func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(node.GetStateCode()); err != nil { diff --git a/internal/proxy/rootcoord_mock_test.go b/internal/proxy/rootcoord_mock_test.go index 915b679db7..513c941200 100644 --- a/internal/proxy/rootcoord_mock_test.go +++ b/internal/proxy/rootcoord_mock_test.go @@ -1095,6 +1095,10 @@ func (coord *RootCoordMock) AlterCollection(ctx context.Context, request *milvus return &commonpb.Status{}, nil } +func (coord *RootCoordMock) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, nil +} + func (coord *RootCoordMock) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, nil } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 05c881d61c..e0734354b1 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -89,6 +89,7 @@ const ( DescribeAliasTaskName = "DescribeAliasTask" ListAliasesTaskName = "ListAliasesTask" AlterCollectionTaskName = "AlterCollectionTask" + AlterCollectionFieldTaskName = "AlterCollectionFieldTask" UpsertTaskName = "UpsertTask" CreateResourceGroupTaskName = "CreateResourceGroupTask" UpdateResourceGroupsTaskName = "UpdateResourceGroupsTask" @@ -953,6 +954,15 @@ func hasLazyLoadProp(props ...*commonpb.KeyValuePair) bool { return false } +func hasPropInDeletekeys(keys []string) string { + for _, key := range keys { + if key == common.MmapEnabledKey || key == common.LazyLoadEnableKey { + return key + } + } + return "" +} + func validatePartitionKeyIsolation(colName string, isPartitionKeyEnabled bool, props ...*commonpb.KeyValuePair) (bool, error) { iso, err := common.IsPartitionKeyIsolationKvEnabled(props...) if err != nil { @@ -980,19 +990,37 @@ func validatePartitionKeyIsolation(colName string, isPartitionKeyEnabled bool, p } func (t *alterCollectionTask) PreExecute(ctx context.Context) error { + if len(t.GetProperties()) > 0 && len(t.GetDeleteKeys()) > 0 { + return merr.WrapErrParameterInvalidMsg("cannot provide both DeleteKeys and ExtraParams") + } + collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName) if err != nil { return err } t.CollectionID = collectionID - if hasMmapProp(t.Properties...) || hasLazyLoadProp(t.Properties...) { - loaded, err := isCollectionLoaded(ctx, t.queryCoord, t.CollectionID) - if err != nil { - return err + + if len(t.GetProperties()) > 0 { + if hasMmapProp(t.Properties...) || hasLazyLoadProp(t.Properties...) { + loaded, err := isCollectionLoaded(ctx, t.queryCoord, t.CollectionID) + if err != nil { + return err + } + if loaded { + return merr.WrapErrCollectionLoaded(t.CollectionName, "can not alter mmap properties if collection loaded") + } } - if loaded { - return merr.WrapErrCollectionLoaded(t.CollectionName, "can not alter mmap properties if collection loaded") + } else if len(t.GetDeleteKeys()) > 0 { + key := hasPropInDeletekeys(t.DeleteKeys) + if key != "" { + loaded, err := isCollectionLoaded(ctx, t.queryCoord, t.CollectionID) + if err != nil { + return err + } + if loaded { + return merr.WrapErrCollectionLoaded(" %s %s can not delete mmap properties if collection loaded", t.CollectionName, key) + } } } @@ -1065,6 +1093,91 @@ func (t *alterCollectionTask) PostExecute(ctx context.Context) error { return nil } +type alterCollectionFieldTask struct { + baseTask + Condition + *milvuspb.AlterCollectionFieldRequest + ctx context.Context + rootCoord types.RootCoordClient + result *commonpb.Status + queryCoord types.QueryCoordClient + dataCoord types.DataCoordClient +} + +func (t *alterCollectionFieldTask) TraceCtx() context.Context { + return t.ctx +} + +func (t *alterCollectionFieldTask) ID() UniqueID { + return t.Base.MsgID +} + +func (t *alterCollectionFieldTask) SetID(uid UniqueID) { + t.Base.MsgID = uid +} + +func (t *alterCollectionFieldTask) Name() string { + return AlterCollectionTaskName +} + +func (t *alterCollectionFieldTask) Type() commonpb.MsgType { + return t.Base.MsgType +} + +func (t *alterCollectionFieldTask) BeginTs() Timestamp { + return t.Base.Timestamp +} + +func (t *alterCollectionFieldTask) EndTs() Timestamp { + return t.Base.Timestamp +} + +func (t *alterCollectionFieldTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts +} + +func (t *alterCollectionFieldTask) OnEnqueue() error { + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_AlterCollectionField + t.Base.SourceID = paramtable.GetNodeID() + return nil +} + +func (t *alterCollectionFieldTask) PreExecute(ctx context.Context) error { + collSchema, err := globalMetaCache.GetCollectionSchema(ctx, t.GetDbName(), t.CollectionName) + if err != nil { + return err + } + + IsStringType := false + fieldName := "" + var dataType int32 + for _, field := range collSchema.Fields { + if field.GetName() == t.FieldName && (typeutil.IsStringType(field.DataType) || typeutil.IsArrayContainStringElementType(field.DataType, field.ElementType)) { + IsStringType = true + fieldName = field.GetName() + dataType = int32(field.DataType) + } + } + if !IsStringType { + return merr.WrapErrParameterInvalid(fieldName, "%s can not modify the maxlength for non-string types", schemapb.DataType_name[dataType]) + } + + return nil +} + +func (t *alterCollectionFieldTask) Execute(ctx context.Context) error { + var err error + t.result, err = t.rootCoord.AlterCollectionField(ctx, t.AlterCollectionFieldRequest) + return merr.CheckRPCCall(t.result, err) +} + +func (t *alterCollectionFieldTask) PostExecute(ctx context.Context) error { + return nil +} + type createPartitionTask struct { baseTask Condition diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index 914b752807..6fc646e39e 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -612,9 +612,21 @@ func (t *alterIndexTask) OnEnqueue() error { } func (t *alterIndexTask) PreExecute(ctx context.Context) error { - for _, param := range t.req.GetExtraParams() { - if !indexparams.IsConfigableIndexParam(param.GetKey()) { - return merr.WrapErrParameterInvalidMsg("%s is not configable index param", param.GetKey()) + if len(t.req.GetDeleteKeys()) > 0 && len(t.req.GetExtraParams()) > 0 { + return merr.WrapErrParameterInvalidMsg("cannot provide both DeleteKeys and ExtraParams") + } + + if len(t.req.GetExtraParams()) > 0 { + for _, param := range t.req.GetExtraParams() { + if !indexparams.IsConfigableIndexParam(param.GetKey()) { + return merr.WrapErrParameterInvalidMsg("%s is not configable index param in extraParams", param.GetKey()) + } + } + } else if len(t.req.GetDeleteKeys()) > 0 { + for _, param := range t.req.GetDeleteKeys() { + if !indexparams.IsConfigableIndexParam(param) { + return merr.WrapErrParameterInvalidMsg("%s is not configable index param in deleteKeys", param) + } } } @@ -656,6 +668,7 @@ func (t *alterIndexTask) Execute(ctx context.Context) error { zap.String("collection", t.req.GetCollectionName()), zap.String("indexName", t.req.GetIndexName()), zap.Any("params", t.req.GetExtraParams()), + zap.Any("deletekeys", t.req.GetDeleteKeys()), ) log.Info("alter index") @@ -665,6 +678,7 @@ func (t *alterIndexTask) Execute(ctx context.Context) error { CollectionID: t.collectionID, IndexName: t.req.GetIndexName(), Params: t.req.GetExtraParams(), + DeleteKeys: t.req.GetDeleteKeys(), } t.result, err = t.datacoord.AlterIndex(ctx, req) if err = merr.CheckRPCCall(t.result, err); err != nil { diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index dc9c326224..af9f7f4bd5 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/common" @@ -48,8 +49,12 @@ func (a *alterCollectionTask) Prepare(ctx context.Context) error { func (a *alterCollectionTask) Execute(ctx context.Context) error { // Now we only support alter properties of collection - if a.Req.GetProperties() == nil { - return errors.New("only support alter collection properties, but collection properties is empty") + if a.Req.GetProperties() == nil && a.Req.GetDeleteKeys() == nil { + return errors.New("The collection properties to alter and keys to delete must not be empty at the same time") + } + + if len(a.Req.GetProperties()) > 0 && len(a.Req.GetDeleteKeys()) > 0 { + return errors.New("can not provide properties and deletekeys at the same time") } oldColl, err := a.core.meta.GetCollectionByName(ctx, a.Req.GetDbName(), a.Req.GetCollectionName(), a.ts) @@ -59,13 +64,16 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { return err } - if ContainsKeyPairArray(a.Req.GetProperties(), oldColl.Properties) { - log.Info("skip to alter collection due to no changes were detected in the properties", zap.Int64("collectionID", oldColl.CollectionID)) - return nil - } - newColl := oldColl.Clone() - newColl.Properties = MergeProperties(oldColl.Properties, a.Req.GetProperties()) + if len(a.Req.Properties) > 0 { + if ContainsKeyPairArray(a.Req.GetProperties(), oldColl.Properties) { + log.Info("skip to alter collection due to no changes were detected in the properties", zap.Int64("collectionID", oldColl.CollectionID)) + return nil + } + newColl.Properties = MergeProperties(oldColl.Properties, a.Req.GetProperties()) + } else if len(a.Req.DeleteKeys) > 0 { + newColl.Properties = DeleteProperties(oldColl.Properties, a.Req.GetDeleteKeys()) + } ts := a.GetTs() redoTask := newBaseRedoTask(a.core.stepExecutor) @@ -133,3 +141,145 @@ func (a *alterCollectionTask) GetLockerKey() LockerKey { NewCollectionLockerKey(collection, true), ) } + +func DeleteProperties(oldProps []*commonpb.KeyValuePair, deleteKeys []string) []*commonpb.KeyValuePair { + propsMap := make(map[string]string) + for _, prop := range oldProps { + propsMap[prop.Key] = prop.Value + } + for _, key := range deleteKeys { + delete(propsMap, key) + } + propKV := make([]*commonpb.KeyValuePair, 0, len(propsMap)) + for key, value := range propsMap { + propKV = append(propKV, &commonpb.KeyValuePair{Key: key, Value: value}) + } + log.Info("Alter Collection Drop Properties", zap.Any("newProperties", propKV)) + return propKV +} + +type alterCollectionFieldTask struct { + baseTask + Req *milvuspb.AlterCollectionFieldRequest +} + +func (a *alterCollectionFieldTask) Prepare(ctx context.Context) error { + if a.Req.GetCollectionName() == "" { + return fmt.Errorf("alter collection field failed, collection name does not exists") + } + + if a.Req.GetFieldName() == "" { + return fmt.Errorf("alter collection field failed, filed name does not exists") + } + + err := IsValidateUpdatedFieldProps(a.Req.GetProperties()) + if err != nil { + return err + } + + return nil +} + +func (a *alterCollectionFieldTask) Execute(ctx context.Context) error { + if a.Req.GetProperties() == nil { + return errors.New("only support alter collection properties, but collection field properties is empty") + } + + oldColl, err := a.core.meta.GetCollectionByName(ctx, a.Req.GetDbName(), a.Req.GetCollectionName(), a.ts) + if err != nil { + log.Warn("get collection failed during changing collection state", + zap.String("collectionName", a.Req.GetCollectionName()), + zap.String("fieldName", a.Req.GetFieldName()), + zap.Uint64("ts", a.ts)) + return err + } + + newColl := oldColl.Clone() + err = UpdateFieldProperties(newColl, a.Req.GetFieldName(), a.Req.GetProperties()) + if err != nil { + return err + } + ts := a.GetTs() + redoTask := newBaseRedoTask(a.core.stepExecutor) + redoTask.AddSyncStep(&AlterCollectionStep{ + baseStep: baseStep{core: a.core}, + oldColl: oldColl, + newColl: newColl, + ts: ts, + }) + + redoTask.AddSyncStep(&BroadcastAlteredCollectionStep{ + baseStep: baseStep{core: a.core}, + req: &milvuspb.AlterCollectionRequest{ + Base: a.Req.Base, + DbName: a.Req.DbName, + CollectionName: a.Req.CollectionName, + CollectionID: oldColl.CollectionID, + }, + core: a.core, + }) + collectionNames := []string{} + redoTask.AddSyncStep(&expireCacheStep{ + baseStep: baseStep{core: a.core}, + dbName: a.Req.GetDbName(), + collectionNames: append(collectionNames, a.Req.GetCollectionName()), + collectionID: oldColl.CollectionID, + opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollectionField)}, + }) + + return redoTask.Execute(ctx) +} + +var allowedProps = []string{ + common.MaxLengthKey, + common.MmapEnabledKey, +} + +func IsKeyAllowed(key string) bool { + for _, allowedKey := range allowedProps { + if key == allowedKey { + return true + } + } + return false +} + +func IsValidateUpdatedFieldProps(updatedProps []*commonpb.KeyValuePair) error { + for _, prop := range updatedProps { + if !IsKeyAllowed(prop.Key) { + return merr.WrapErrParameterInvalidMsg("%s does not allow update in collection field param", prop.Key) + } + } + return nil +} + +func UpdateFieldProperties(coll *model.Collection, fieldName string, updatedProps []*commonpb.KeyValuePair) error { + for i, field := range coll.Fields { + if field.Name == fieldName { + coll.Fields[i].TypeParams = UpdateFieldPropertyParams(field.TypeParams, updatedProps) + return nil + } + } + return merr.WrapErrParameterInvalidMsg("field %s does not exist in collection", fieldName) +} + +func UpdateFieldPropertyParams(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { + props := make(map[string]string) + for _, prop := range oldProps { + props[prop.Key] = prop.Value + } + log.Info("UpdateFieldPropertyParams", zap.Any("oldprops", props), zap.Any("newprops", updatedProps)) + for _, prop := range updatedProps { + props[prop.Key] = prop.Value + } + log.Info("UpdateFieldPropertyParams", zap.Any("newprops", props)) + propKV := make([]*commonpb.KeyValuePair, 0) + for key, value := range props { + propKV = append(propKV, &commonpb.KeyValuePair{ + Key: key, + Value: value, + }) + } + + return propKV +} diff --git a/internal/rootcoord/alter_collection_task_test.go b/internal/rootcoord/alter_collection_task_test.go index 716f2aa28b..e2b53e958d 100644 --- a/internal/rootcoord/alter_collection_task_test.go +++ b/internal/rootcoord/alter_collection_task_test.go @@ -310,4 +310,42 @@ func Test_alterCollectionTask_Execute(t *testing.T) { Value: "true", }) }) + + t.Run("test delete collection props", func(t *testing.T) { + coll := &model.Collection{ + Properties: []*commonpb.KeyValuePair{ + { + Key: common.CollectionTTLConfigKey, + Value: "1", + }, + { + Key: common.CollectionAutoCompactionKey, + Value: "true", + }, + }, + } + + deleteKeys := []string{common.CollectionTTLConfigKey} + coll.Properties = DeleteProperties(coll.Properties, deleteKeys) + assert.NotContains(t, coll.Properties, &commonpb.KeyValuePair{ + Key: common.CollectionTTLConfigKey, + Value: "1", + }) + + assert.Contains(t, coll.Properties, &commonpb.KeyValuePair{ + Key: common.CollectionAutoCompactionKey, + Value: "true", + }) + + deleteKeys = []string{"nonexistent.key"} + coll.Properties = DeleteProperties(coll.Properties, deleteKeys) + assert.Contains(t, coll.Properties, &commonpb.KeyValuePair{ + Key: common.CollectionAutoCompactionKey, + Value: "true", + }) + + deleteKeys = []string{common.CollectionAutoCompactionKey} + coll.Properties = DeleteProperties(coll.Properties, deleteKeys) + assert.Empty(t, coll.Properties) + }) } diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index c4811b3550..6c6731d7c3 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -225,7 +225,11 @@ func (b *ServerBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID } func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { - log.Info("broadcasting request to alter collection", zap.String("collectionName", req.GetCollectionName()), zap.Int64("collectionID", req.GetCollectionID()), zap.Any("props", req.GetProperties())) + log.Info("broadcasting request to alter collection", + zap.String("collectionName", req.GetCollectionName()), + zap.Int64("collectionID", req.GetCollectionID()), + zap.Any("props", req.GetProperties()), + zap.Any("deleteKeys", req.GetDeleteKeys())) colMeta, err := b.s.meta.GetCollectionByID(ctx, req.GetDbName(), req.GetCollectionID(), typeutil.MaxTimestamp, false) if err != nil { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index ed07f88a8c..be996c5d67 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1356,7 +1356,9 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection log.Ctx(ctx).Info("received request to alter collection", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), - zap.Any("props", in.Properties)) + zap.Any("props", in.Properties), + zap.Any("delete_keys", in.DeleteKeys), + ) t := &alterCollectionTask{ baseTask: newBaseTask(ctx, c), @@ -1395,6 +1397,58 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection return merr.Success(), nil } +func (c *Core) AlterCollectionField(ctx context.Context, in *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { + if err := merr.CheckHealthy(c.GetStateCode()); err != nil { + return merr.Status(err), nil + } + + metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.TotalLabel).Inc() + tr := timerecord.NewTimeRecorder("AlterCollectionField") + + log.Ctx(ctx).Info("received request to alter collection field", + zap.String("role", typeutil.RootCoordRole), + zap.String("name", in.GetCollectionName()), + zap.String("fieldName", in.GetFieldName()), + zap.Any("props", in.Properties), + ) + + t := &alterCollectionFieldTask{ + baseTask: newBaseTask(ctx, c), + Req: in, + } + + if err := c.scheduler.AddTask(t); err != nil { + log.Warn("failed to enqueue request to alter collection field", + zap.String("role", typeutil.RootCoordRole), + zap.Error(err), + zap.String("name", in.GetCollectionName()), + zap.String("fieldName", in.GetFieldName())) + + metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.FailLabel).Inc() + return merr.Status(err), nil + } + + if err := t.WaitToFinish(); err != nil { + log.Warn("failed to alter collection", + zap.String("role", typeutil.RootCoordRole), + zap.Error(err), + zap.String("name", in.GetCollectionName()), + zap.Uint64("ts", t.GetTs())) + + metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.FailLabel).Inc() + return merr.Status(err), nil + } + + metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.SuccessLabel).Inc() + metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollectionField").Observe(float64(tr.ElapseSpan().Milliseconds())) + + log.Info("done to alter collection field", + zap.String("role", typeutil.RootCoordRole), + zap.String("name", in.GetCollectionName()), + zap.String("fieldName", in.GetFieldName())) + return merr.Success(), nil +} + func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil diff --git a/internal/util/mock/grpc_rootcoord_client.go b/internal/util/mock/grpc_rootcoord_client.go index ea75308bb9..abf856d0be 100644 --- a/internal/util/mock/grpc_rootcoord_client.go +++ b/internal/util/mock/grpc_rootcoord_client.go @@ -262,6 +262,10 @@ func (m *GrpcRootCoordClient) AlterCollection(ctx context.Context, in *milvuspb. return &commonpb.Status{}, m.Err } +func (m *GrpcRootCoordClient) AlterCollectionField(ctx context.Context, in *milvuspb.AlterCollectionFieldRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, m.Err +} + func (m *GrpcRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err } diff --git a/pkg/go.mod b/pkg/go.mod index cdf06aac4d..cb6eb14de4 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -14,7 +14,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.7 - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277 github.com/nats-io/nats-server/v2 v2.10.12 github.com/nats-io/nats.go v1.34.1 github.com/panjf2000/ants/v2 v2.7.2 diff --git a/pkg/go.sum b/pkg/go.sum index 15529f4160..04a38ef236 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -488,8 +488,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 h1:cFRrdFZwhFHv33pue1z8beYSvrXDYFSFsCuvXGX3DHE= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277 h1:5/35+F32fs6ifVzI1e+VkUNpK0gWyXQSdZVnmNUFrrg= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index d69687603f..a3f4a5a9d9 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -560,6 +560,15 @@ func IsStringType(dataType schemapb.DataType) bool { } } +func IsArrayContainStringElementType(dataType schemapb.DataType, elementType schemapb.DataType) bool { + if IsArrayType(dataType) { + if elementType == schemapb.DataType_String || elementType == schemapb.DataType_VarChar { + return true + } + } + return false +} + func IsVariableDataType(dataType schemapb.DataType) bool { return IsStringType(dataType) || IsArrayType(dataType) || IsJSONType(dataType) }