mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
https://github.com/milvus-io/milvus/issues/44053 Signed-off-by: junjie.jiang <junjie.jiang@zilliz.com>
276 lines
9.0 KiB
Go
276 lines
9.0 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rootcoord
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"go.uber.org/zap"
|
|
"google.golang.org/protobuf/types/known/fieldmaskpb"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/internal/distributed/streaming"
|
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
|
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/messagespb"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
func callAlterCollection(ctx context.Context, c *Core, broadcaster broadcaster.BroadcastAPI, coll *model.Collection, dbName string, collectionName string) error {
|
|
// build new collection schema.
|
|
schema := &schemapb.CollectionSchema{
|
|
Name: coll.Name,
|
|
Description: coll.Description,
|
|
AutoID: coll.AutoID,
|
|
Fields: model.MarshalFieldModels(coll.Fields),
|
|
StructArrayFields: model.MarshalStructArrayFieldModels(coll.StructArrayFields),
|
|
Functions: model.MarshalFunctionModels(coll.Functions),
|
|
EnableDynamicField: coll.EnableDynamicField,
|
|
Properties: coll.Properties,
|
|
Version: coll.SchemaVersion + 1,
|
|
}
|
|
|
|
cacheExpirations, err := c.getCacheExpireForCollection(ctx, dbName, collectionName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
header := &messagespb.AlterCollectionMessageHeader{
|
|
DbId: coll.DBID,
|
|
CollectionId: coll.CollectionID,
|
|
UpdateMask: &fieldmaskpb.FieldMask{
|
|
Paths: []string{message.FieldMaskCollectionSchema},
|
|
},
|
|
CacheExpirations: cacheExpirations,
|
|
}
|
|
body := &messagespb.AlterCollectionMessageBody{
|
|
Updates: &messagespb.AlterCollectionMessageUpdates{
|
|
Schema: schema,
|
|
},
|
|
}
|
|
|
|
channels := make([]string, 0, len(coll.VirtualChannelNames)+1)
|
|
channels = append(channels, streaming.WAL().ControlChannel())
|
|
channels = append(channels, coll.VirtualChannelNames...)
|
|
msg := message.NewAlterCollectionMessageBuilderV2().
|
|
WithHeader(header).
|
|
WithBody(body).
|
|
WithBroadcast(channels).
|
|
MustBuildBroadcast()
|
|
if _, err := broadcaster.Broadcast(ctx, msg); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func alterFunctionGenNewCollection(ctx context.Context, fSchema *schemapb.FunctionSchema, collection *model.Collection) error {
|
|
if fSchema == nil {
|
|
return fmt.Errorf("Function schema is empty")
|
|
}
|
|
var oldFuncSchema *model.Function
|
|
newFuncs := []*model.Function{}
|
|
for _, f := range collection.Functions {
|
|
if f.Name == fSchema.Name {
|
|
oldFuncSchema = f
|
|
} else {
|
|
newFuncs = append(newFuncs, f)
|
|
}
|
|
}
|
|
if oldFuncSchema == nil {
|
|
err := fmt.Errorf("Function %s not exists", fSchema.Name)
|
|
log.Ctx(ctx).Error("Alter function failed:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
fSchema.Id = oldFuncSchema.ID
|
|
|
|
fieldMapping := map[string]*model.Field{}
|
|
for _, field := range collection.Fields {
|
|
fieldMapping[field.Name] = field
|
|
}
|
|
|
|
// reset output field info
|
|
for _, name := range oldFuncSchema.OutputFieldNames {
|
|
field, exists := fieldMapping[name]
|
|
if !exists {
|
|
return fmt.Errorf("Old version function's output field %s not exists", name)
|
|
}
|
|
field.IsFunctionOutput = false
|
|
}
|
|
|
|
for _, name := range fSchema.InputFieldNames {
|
|
field, exists := fieldMapping[name]
|
|
if !exists {
|
|
err := fmt.Errorf("function's input field %s not exists", name)
|
|
log.Ctx(ctx).Error("Incorrect function configuration:", zap.Error(err))
|
|
return err
|
|
}
|
|
fSchema.InputFieldIds = append(fSchema.InputFieldIds, field.FieldID)
|
|
}
|
|
for _, name := range fSchema.OutputFieldNames {
|
|
field, exists := fieldMapping[name]
|
|
if !exists {
|
|
err := fmt.Errorf("function's output field %s not exists", name)
|
|
log.Ctx(ctx).Error("Incorrect function configuration:", zap.Error(err))
|
|
return err
|
|
}
|
|
if field.IsFunctionOutput {
|
|
err := fmt.Errorf("function's output field %s is already of other functions", name)
|
|
log.Ctx(ctx).Error("Incorrect function configuration: ", zap.Error(err))
|
|
return err
|
|
}
|
|
fSchema.OutputFieldIds = append(fSchema.OutputFieldIds, field.FieldID)
|
|
field.IsFunctionOutput = true
|
|
}
|
|
newFunc := model.UnmarshalFunctionModel(fSchema)
|
|
newFuncs = append(newFuncs, newFunc)
|
|
collection.Functions = newFuncs
|
|
return nil
|
|
}
|
|
|
|
func (c *Core) broadcastAlterCollectionForAlterFunction(ctx context.Context, req *milvuspb.AlterCollectionFunctionRequest) error {
|
|
broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer broadcaster.Close()
|
|
|
|
oldColl, err := c.meta.GetCollectionByName(ctx, req.GetDbName(), req.GetCollectionName(), typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
newColl := oldColl.Clone()
|
|
if err := alterFunctionGenNewCollection(ctx, req.FunctionSchema, newColl); err != nil {
|
|
return err
|
|
}
|
|
|
|
return callAlterCollection(ctx, c, broadcaster, newColl, req.GetDbName(), req.GetCollectionName())
|
|
}
|
|
|
|
func (c *Core) broadcastAlterCollectionForDropFunction(ctx context.Context, req *milvuspb.DropCollectionFunctionRequest) error {
|
|
broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer broadcaster.Close()
|
|
|
|
oldColl, err := c.meta.GetCollectionByName(ctx, req.GetDbName(), req.GetCollectionName(), typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var needDelFunc *model.Function
|
|
for _, f := range oldColl.Functions {
|
|
if f.Name == req.FunctionName {
|
|
needDelFunc = f
|
|
break
|
|
}
|
|
}
|
|
if needDelFunc == nil {
|
|
return nil
|
|
}
|
|
|
|
newColl := oldColl.Clone()
|
|
|
|
newFuncs := []*model.Function{}
|
|
for _, f := range newColl.Functions {
|
|
if f.Name != needDelFunc.Name {
|
|
newFuncs = append(newFuncs, f)
|
|
}
|
|
}
|
|
newColl.Functions = newFuncs
|
|
|
|
fieldMapping := map[int64]*model.Field{}
|
|
for _, field := range newColl.Fields {
|
|
fieldMapping[field.FieldID] = field
|
|
}
|
|
for _, id := range needDelFunc.OutputFieldIDs {
|
|
field, exists := fieldMapping[id]
|
|
if !exists {
|
|
return fmt.Errorf("function's output field %d not exists", id)
|
|
}
|
|
field.IsFunctionOutput = false
|
|
}
|
|
return callAlterCollection(ctx, c, broadcaster, newColl, req.GetDbName(), req.GetCollectionName())
|
|
}
|
|
|
|
func (c *Core) broadcastAlterCollectionForAddFunction(ctx context.Context, req *milvuspb.AddCollectionFunctionRequest) error {
|
|
broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer broadcaster.Close()
|
|
|
|
oldColl, err := c.meta.GetCollectionByName(ctx, req.GetDbName(), req.GetCollectionName(), typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
newColl := oldColl.Clone()
|
|
fSchema := req.FunctionSchema
|
|
if fSchema == nil {
|
|
return merr.WrapErrParameterInvalidMsg("Function schema is empty")
|
|
}
|
|
|
|
nextFunctionID := int64(StartOfUserFunctionID)
|
|
for _, f := range newColl.Functions {
|
|
if f.Name == fSchema.Name {
|
|
return merr.WrapErrParameterInvalidMsg("function name already exists: %s", f.Name)
|
|
}
|
|
nextFunctionID = max(nextFunctionID, f.ID+1)
|
|
}
|
|
fSchema.Id = nextFunctionID
|
|
|
|
fieldMapping := map[string]*model.Field{}
|
|
for _, field := range newColl.Fields {
|
|
fieldMapping[field.Name] = field
|
|
}
|
|
for _, name := range fSchema.InputFieldNames {
|
|
field, exists := fieldMapping[name]
|
|
if !exists {
|
|
err := fmt.Errorf("function's input field %s not exists", name)
|
|
log.Ctx(ctx).Error("Incorrect function configuration:", zap.Error(err))
|
|
return err
|
|
}
|
|
fSchema.InputFieldIds = append(fSchema.InputFieldIds, field.FieldID)
|
|
}
|
|
for _, name := range fSchema.OutputFieldNames {
|
|
field, exists := fieldMapping[name]
|
|
if !exists {
|
|
err := fmt.Errorf("function's output field %s not exists", name)
|
|
log.Ctx(ctx).Error("Incorrect function configuration:", zap.Error(err))
|
|
return err
|
|
}
|
|
if field.IsFunctionOutput {
|
|
err := fmt.Errorf("function's output field %s is already of other functions", name)
|
|
log.Ctx(ctx).Error("Incorrect function configuration: ", zap.Error(err))
|
|
return err
|
|
}
|
|
fSchema.OutputFieldIds = append(fSchema.OutputFieldIds, field.FieldID)
|
|
field.IsFunctionOutput = true
|
|
}
|
|
newFunc := model.UnmarshalFunctionModel(fSchema)
|
|
newColl.Functions = append(newColl.Functions, newFunc)
|
|
return callAlterCollection(ctx, c, broadcaster, newColl, req.GetDbName(), req.GetCollectionName())
|
|
}
|