package rootcoord import ( "context" "github.com/cockroachdb/errors" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/fieldmaskpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/timestamptz" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // broadcastAlterCollectionForAddField broadcasts the put collection message for add field. func (c *Core) broadcastAlterCollectionForAddField(ctx context.Context, req *milvuspb.AddCollectionFieldRequest) error { broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { return err } defer broadcaster.Close() // check if the collection is created. coll, err := c.meta.GetCollectionByName(ctx, req.GetDbName(), req.GetCollectionName(), typeutil.MaxTimestamp) if err != nil { return err } // check if the field schema is illegal. fieldSchema := &schemapb.FieldSchema{} if err = proto.Unmarshal(req.Schema, fieldSchema); err != nil { return errors.Wrap(err, "failed to unmarshal field schema") } if err := checkFieldSchema([]*schemapb.FieldSchema{fieldSchema}); err != nil { return errors.Wrap(err, "failed to check field schema") } if fieldSchema.GetDataType() == schemapb.DataType_Timestamptz { timezone, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, coll.Properties) if !exist { timezone = common.DefaultTimezone } timestamptz.CheckAndRewriteTimestampTzDefaultValueForFieldSchema(fieldSchema, timezone) } // check if the field already exists for _, field := range coll.Fields { if field.Name == fieldSchema.Name { // TODO: idempotency check here. return merr.WrapErrParameterInvalidMsg("field already exists, name: %s", fieldSchema.Name) } } // assign a new field id. fieldSchema.FieldID = nextFieldID(coll) // build new collection schema. schema := &schemapb.CollectionSchema{ Name: coll.Name, Description: coll.Description, AutoID: coll.AutoID, Fields: model.MarshalFieldModels(coll.Fields), StructArrayFields: model.MarshalStructArrayFieldModels(coll.StructArrayFields), Functions: model.MarshalFunctionModels(coll.Functions), EnableDynamicField: coll.EnableDynamicField, Properties: coll.Properties, Version: coll.SchemaVersion + 1, } schema.Fields = append(schema.Fields, fieldSchema) cacheExpirations, err := c.getCacheExpireForCollection(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { return err } channels := make([]string, 0, len(coll.VirtualChannelNames)+1) channels = append(channels, streaming.WAL().ControlChannel()) channels = append(channels, coll.VirtualChannelNames...) // broadcast the put collection v2 message. msg := message.NewAlterCollectionMessageBuilderV2(). WithHeader(&messagespb.AlterCollectionMessageHeader{ DbId: coll.DBID, CollectionId: coll.CollectionID, UpdateMask: &fieldmaskpb.FieldMask{ Paths: []string{message.FieldMaskCollectionSchema}, }, CacheExpirations: cacheExpirations, }). WithBody(&messagespb.AlterCollectionMessageBody{ Updates: &messagespb.AlterCollectionMessageUpdates{ Schema: schema, }, }). WithBroadcast(channels). MustBuildBroadcast() if _, err := broadcaster.Broadcast(ctx, msg); err != nil { return err } return nil }