diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index a23d0140ea..6c48f8ed25 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -523,6 +523,17 @@ SegmentInternalInterface::bulk_subscript_not_exist_field( } break; } + // for enabling dynamic field, normal json not support default value yet + case DataType::JSON: { + auto data_ptr = result->mutable_scalars() + ->mutable_json_data() + ->mutable_data(); + + for (int64_t i = 0; i < count; ++i) { + data_ptr->at(i) = field_meta.default_value()->bytes_data(); + } + break; + } default: { ThrowInfo(DataTypeInvalid, fmt::format("unsupported default value type {}", diff --git a/internal/rootcoord/add_field_task.go b/internal/rootcoord/add_field_task.go index a80e80dd97..164cf95abf 100644 --- a/internal/rootcoord/add_field_task.go +++ b/internal/rootcoord/add_field_task.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/util/proxyutil" - "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" ) @@ -61,24 +60,15 @@ func (t *addCollectionFieldTask) Execute(ctx context.Context) error { } // assign field id - t.fieldSchema.FieldID = t.nextFieldID(oldColl) + t.fieldSchema.FieldID = nextFieldID(oldColl) newField := model.UnmarshalFieldModel(t.fieldSchema) ts := t.GetTs() + t.Req.CollectionID = oldColl.CollectionID return executeAddCollectionFieldTaskSteps(ctx, t.core, oldColl, newField, t.Req, ts) } -func (t *addCollectionFieldTask) nextFieldID(coll *model.Collection) int64 { - maxFieldID := int64(common.StartOfUserFieldID) - for _, field := range coll.Fields { - if field.FieldID > maxFieldID { - maxFieldID = field.FieldID - } - } - return maxFieldID + 1 -} - func (t *addCollectionFieldTask) GetLockerKey() LockerKey { collection := t.core.getCollectionIDStr(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), 0) return NewLockerKeyChain( @@ -88,17 +78,26 @@ func (t *addCollectionFieldTask) GetLockerKey() LockerKey { ) } +type collInfoProvider interface { + GetDbName() string + GetCollectionName() string + GetCollectionID() int64 +} + func executeAddCollectionFieldTaskSteps(ctx context.Context, core *Core, col *model.Collection, newField *model.Field, - req *milvuspb.AddCollectionFieldRequest, + req collInfoProvider, ts Timestamp, ) error { redoTask := newBaseRedoTask(core.stepExecutor) updatedCollection := col.Clone() updatedCollection.Fields = append(updatedCollection.Fields, newField) + if newField.IsDynamic { + updatedCollection.EnableDynamicField = true + } redoTask.AddSyncStep(&WriteSchemaChangeWALStep{ baseStep: baseStep{core: core}, collection: updatedCollection, @@ -112,7 +111,6 @@ func executeAddCollectionFieldTaskSteps(ctx context.Context, newField: newField, }) - req.CollectionID = oldColl.CollectionID redoTask.AddSyncStep(&BroadcastAlteredCollectionStep{ baseStep: baseStep{core: core}, req: &milvuspb.AlterCollectionRequest{ diff --git a/internal/rootcoord/alter_dynamic_schema_task.go b/internal/rootcoord/alter_dynamic_schema_task.go new file mode 100644 index 0000000000..5f92820b2b --- /dev/null +++ b/internal/rootcoord/alter_dynamic_schema_task.go @@ -0,0 +1,115 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/merr" +) + +type alterDynamicFieldTask struct { + baseTask + Req *milvuspb.AlterCollectionRequest + oldColl *model.Collection + fieldSchema *schemapb.FieldSchema + targetValue bool +} + +func (t *alterDynamicFieldTask) Prepare(ctx context.Context) error { + if err := CheckMsgType(t.Req.GetBase().GetMsgType(), commonpb.MsgType_AlterCollection); err != nil { + return err + } + + oldColl, err := t.core.meta.GetCollectionByName(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), t.ts) + if err != nil { + log.Ctx(ctx).Warn("get collection failed during alter dynamic schema", + zap.String("collectionName", t.Req.GetCollectionName()), zap.Uint64("ts", t.ts)) + return err + } + t.oldColl = oldColl + + if len(t.Req.GetProperties()) > 1 { + return merr.WrapErrParameterInvalidMsg("cannot alter dynamic schema with other properties") + } + + // return nil for no-op + if oldColl.EnableDynamicField == t.targetValue { + return nil + } + + // not support disabling since remove field not support yet. + if !t.targetValue { + return merr.WrapErrParameterInvalidMsg("dynamic schema cannot supported to be disabled") + } + + // convert to add $meta json field, nullable, default value `{}` + t.fieldSchema = &schemapb.FieldSchema{ + Name: common.MetaFieldName, + DataType: schemapb.DataType_JSON, + IsDynamic: true, + Nullable: true, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_BytesData{ + BytesData: []byte("{}"), + }, + }, + } + + if err := checkFieldSchema([]*schemapb.FieldSchema{t.fieldSchema}); err != nil { + return err + } + return nil +} + +func (t *alterDynamicFieldTask) Execute(ctx context.Context) error { + // return nil for no-op + if t.oldColl.EnableDynamicField == t.targetValue { + log.Info("dynamic schema is same as target value", + zap.Bool("targetValue", t.targetValue), + zap.String("collectionName", t.Req.GetCollectionName())) + return nil + } + // assign field id + t.fieldSchema.FieldID = nextFieldID(t.oldColl) + + // currently only add dynamic field support + // TODO check target value to remove field after supported + + newField := model.UnmarshalFieldModel(t.fieldSchema) + t.Req.CollectionID = t.oldColl.CollectionID + + ts := t.GetTs() + return executeAddCollectionFieldTaskSteps(ctx, t.core, t.oldColl, newField, t.Req, ts) +} + +func (t *alterDynamicFieldTask) GetLockerKey() LockerKey { + collection := t.core.getCollectionIDStr(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), 0) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collection, true), + ) +} diff --git a/internal/rootcoord/alter_dynamic_schema_task_test.go b/internal/rootcoord/alter_dynamic_schema_task_test.go new file mode 100644 index 0000000000..857377e5fa --- /dev/null +++ b/internal/rootcoord/alter_dynamic_schema_task_test.go @@ -0,0 +1,251 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/distributed/streaming" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming" + mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" + "github.com/milvus-io/milvus/pkg/v2/util/merr" +) + +type AlterDynamicSchemaTaskSuite struct { + suite.Suite + meta *mockrootcoord.IMetaTable +} + +func (s *AlterDynamicSchemaTaskSuite) getDisabledCollection() *model.Collection { + return &model.Collection{ + CollectionID: 1, + Name: "coll_disabled", + Fields: []*model.Field{ + { + Name: "pk", + FieldID: 100, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + Name: "vec", + FieldID: 101, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "768", + }, + }, + }, + }, + EnableDynamicField: false, + PhysicalChannelNames: []string{"dml_ch_01", "dml_ch_02"}, + VirtualChannelNames: []string{"dml_ch_01", "dml_ch_02"}, + } +} + +func (s *AlterDynamicSchemaTaskSuite) SetupTest() { + s.meta = mockrootcoord.NewIMetaTable(s.T()) + s.meta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, "not_existed_coll", mock.Anything).Return(nil, merr.WrapErrCollectionNotFound("not_existed_coll")).Maybe() + s.meta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, "coll_disabled", mock.Anything).Return(s.getDisabledCollection(), nil).Maybe() + s.meta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, "coll_enabled", mock.Anything).Return(&model.Collection{ + CollectionID: 1, + Name: "coll_enabled", + Fields: []*model.Field{ + { + Name: "pk", + FieldID: 100, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + Name: "vec", + FieldID: 101, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "768", + }, + }, + }, + { + Name: "$meta", + IsDynamic: true, + DataType: schemapb.DataType_JSON, + }, + }, + EnableDynamicField: true, + }, nil).Maybe() +} + +func (s *AlterDynamicSchemaTaskSuite) TestPrepare() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("invalid_msg_type", func() { + task := &alterDynamicFieldTask{Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}}} + err := task.Prepare(ctx) + s.Error(err) + }) + + s.Run("alter_with_other_properties", func() { + task := &alterDynamicFieldTask{Req: &milvuspb.AlterCollectionRequest{ + Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.EnableDynamicSchemaKey, + Value: "true", + }, + { + Key: "other_keys", + Value: "other_values", + }, + }, + }} + err := task.Prepare(ctx) + s.Error(err) + }) + + s.Run("disable_dynamic_field_for_disabled_coll", func() { + core := newTestCore(withMeta(s.meta)) + task := &alterDynamicFieldTask{ + baseTask: newBaseTask(ctx, core), + Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_disabled"}, + targetValue: false, + } + err := task.Prepare(ctx) + s.NoError(err, "disabling dynamic schema on diabled collection shall be a no-op") + }) + + s.Run("disable_dynamic_field_for_enabled_coll", func() { + core := newTestCore(withMeta(s.meta)) + task := &alterDynamicFieldTask{ + baseTask: newBaseTask(ctx, core), + Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_enabled"}, + targetValue: false, + } + err := task.Prepare(ctx) + s.Error(err, "disabling dynamic schema on enabled collection not supported yet") + }) + + s.Run("enable_dynamic_field_for_enabled_coll", func() { + core := newTestCore(withMeta(s.meta)) + task := &alterDynamicFieldTask{ + baseTask: newBaseTask(ctx, core), + Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_enabled"}, + targetValue: true, + } + err := task.Prepare(ctx) + s.NoError(err) + }) + + s.Run("collection_not_exist", func() { + core := newTestCore(withMeta(s.meta)) + task := &alterDynamicFieldTask{ + baseTask: newBaseTask(ctx, core), + Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "not_existed_coll"}, + targetValue: true, + } + err := task.Prepare(ctx) + s.Error(err) + }) + + s.Run("normal_case", func() { + core := newTestCore(withMeta(s.meta)) + task := &alterDynamicFieldTask{ + baseTask: newBaseTask(ctx, core), + Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_disabled"}, + targetValue: true, + } + err := task.Prepare(ctx) + s.NoError(err) + s.NotNil(task.fieldSchema) + }) +} + +func (s *AlterDynamicSchemaTaskSuite) TestExecute() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("no_op", func() { + core := newTestCore(withMeta(s.meta)) + task := &alterDynamicFieldTask{ + baseTask: newBaseTask(ctx, core), + Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_disabled"}, + oldColl: s.getDisabledCollection(), + targetValue: false, + } + err := task.Execute(ctx) + s.NoError(err) + }) + + s.Run("normal_case", func() { + b := mock_streaming.NewMockBroadcast(s.T()) + wal := mock_streaming.NewMockWALAccesser(s.T()) + wal.EXPECT().Broadcast().Return(b).Maybe() + streaming.SetWALForTest(wal) + + b.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.BroadcastAppendResult{ + AppendResults: map[string]*types.AppendResult{ + "dml_ch_01": {TimeTick: 100}, + "dml_ch_02": {TimeTick: 101}, + }, + }, nil).Times(1) + + s.meta.EXPECT().AlterCollection( + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(nil) + s.meta.EXPECT().ListAliasesByID(mock.Anything, mock.Anything).Return([]string{}) + + broker := newMockBroker() + broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { + return nil + } + alloc := newMockIDAllocator() + core := newTestCore(withValidProxyManager(), withMeta(s.meta), withBroker(broker), withIDAllocator(alloc)) + + task := &alterDynamicFieldTask{ + baseTask: newBaseTask(ctx, core), + Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_disabled"}, + targetValue: true, + } + err := task.Prepare(ctx) + s.NoError(err) + err = task.Execute(ctx) + s.NoError(err) + }) +} + +func TestAlterDynamicSchemaTask(t *testing.T) { + suite.Run(t, new(AlterDynamicSchemaTaskSuite)) +} diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 5453ff53e9..c44b364df6 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1381,23 +1381,38 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("AlterCollection") - log.Ctx(ctx).Info("received request to alter collection", + log := log.Ctx(ctx).With( zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), + ) + + log.Info("received request to alter collection", zap.Any("props", in.Properties), zap.Any("delete_keys", in.DeleteKeys), ) - t := &alterCollectionTask{ - baseTask: newBaseTask(ctx, c), - Req: in, + var t task + if ok, value, err := common.IsEnableDynamicSchema(in.GetProperties()); ok { + if err != nil { + log.Warn("failed to check dynamic schema prop kv", zap.Error(err)) + return merr.Status(err), nil + } + log.Info("found update dynamic schema prop kv") + t = &alterDynamicFieldTask{ + baseTask: newBaseTask(ctx, c), + Req: in, + targetValue: value, + } + } else { + t = &alterCollectionTask{ + baseTask: newBaseTask(ctx, c), + Req: in, + } } if err := c.scheduler.AddTask(t); err != nil { log.Warn("failed to enqueue request to alter collection", - zap.String("role", typeutil.RootCoordRole), - zap.Error(err), - zap.String("name", in.GetCollectionName())) + zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc() return merr.Status(err), nil @@ -1405,9 +1420,7 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection if err := t.WaitToFinish(); err != nil { log.Warn("failed to alter collection", - zap.String("role", typeutil.RootCoordRole), zap.Error(err), - zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc() @@ -1416,11 +1429,9 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterCollection").Observe(float64(t.queueDur.Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterCollection").Observe(float64(t.GetDurationInQueue().Milliseconds())) log.Info("done to alter collection", - zap.String("role", typeutil.RootCoordRole), - zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index c311d727eb..a580dd8cb9 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -34,6 +34,7 @@ import ( mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/proxypb" @@ -1516,6 +1517,32 @@ func TestRootCoord_AlterCollection(t *testing.T) { assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) }) + + t.Run("set_dynamic_field_bad_request", func(t *testing.T) { + c := newTestCore(withHealthyCode(), + withValidScheduler()) + + ctx := context.Background() + resp, err := c.AlterCollection(ctx, &milvuspb.AlterCollectionRequest{ + Properties: []*commonpb.KeyValuePair{ + {Key: common.EnableDynamicSchemaKey, Value: "abc"}, + }, + }) + assert.Error(t, merr.CheckRPCCall(resp, err)) + }) + + t.Run("set_dynamic_field_ok", func(t *testing.T) { + c := newTestCore(withHealthyCode(), + withValidScheduler()) + + ctx := context.Background() + resp, err := c.AlterCollection(ctx, &milvuspb.AlterCollectionRequest{ + Properties: []*commonpb.KeyValuePair{ + {Key: common.EnableDynamicSchemaKey, Value: "true"}, + }, + }) + assert.NoError(t, merr.CheckRPCCall(resp, err)) + }) } func TestRootCoord_CheckHealth(t *testing.T) { diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index dc075afe17..61c4f202a0 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -56,6 +56,7 @@ type task interface { NotifyDone(err error) IsFinished() bool SetInQueueDuration() + GetDurationInQueue() time.Duration GetLockerKey() LockerKey } @@ -127,6 +128,10 @@ func (b *baseTask) SetInQueueDuration() { b.queueDur = b.tr.ElapseSpan() } +func (b *baseTask) GetDurationInQueue() time.Duration { + return b.queueDur +} + func (b *baseTask) IsFinished() bool { return b.isFinished.Load() } diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index 4634340557..aaca696ea6 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/v2/common" @@ -387,7 +388,11 @@ func checkFieldSchema(fieldSchemas []*schemapb.FieldSchema) error { return merr.WrapErrParameterInvalidMsg(msg) } dtype := fieldSchema.GetDataType() - if dtype == schemapb.DataType_Array || dtype == schemapb.DataType_JSON || typeutil.IsVectorType(dtype) { + if dtype == schemapb.DataType_Array || typeutil.IsVectorType(dtype) { + msg := fmt.Sprintf("type not support default_value, type:%s, name:%s", fieldSchema.GetDataType().String(), fieldSchema.GetName()) + return merr.WrapErrParameterInvalidMsg(msg) + } + if dtype == schemapb.DataType_JSON && !fieldSchema.IsDynamic { msg := fmt.Sprintf("type not support default_value, type:%s, name:%s", fieldSchema.GetDataType().String(), fieldSchema.GetName()) return merr.WrapErrParameterInvalidMsg(msg) } @@ -444,6 +449,19 @@ func checkFieldSchema(fieldSchemas []*schemapb.FieldSchema) error { msg := fmt.Sprintf("the length (%d) of string exceeds max length (%d)", defaultValueLength, maxLength) return merr.WrapErrParameterInvalid("valid length string", "string length exceeds max length", msg) } + case *schemapb.ValueField_BytesData: + if dtype != schemapb.DataType_JSON { + return errTypeMismatch(fieldSchema.GetName(), dtype.String(), "DataType_SJON") + } + defVal := fieldSchema.GetDefaultValue().GetBytesData() + jsonData := make(map[string]interface{}) + if err := json.Unmarshal(defVal, &jsonData); err != nil { + log.Info("invalid default json value, milvus only support json map", + zap.ByteString("data", defVal), + zap.Error(err), + ) + return merr.WrapErrParameterInvalidMsg(err.Error()) + } default: panic("default value unsupport data type") } @@ -537,3 +555,13 @@ func validateStructArrayFieldDataType(fieldSchemas []*schemapb.StructArrayFieldS } return nil } + +func nextFieldID(coll *model.Collection) int64 { + maxFieldID := int64(common.StartOfUserFieldID) + for _, field := range coll.Fields { + if field.FieldID > maxFieldID { + maxFieldID = field.FieldID + } + } + return maxFieldID + 1 +} diff --git a/pkg/common/common.go b/pkg/common/common.go index 26afaa3947..a7db5931d7 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -229,6 +229,7 @@ const ( ReplicateIDKey = "replicate.id" ReplicateEndTSKey = "replicate.endTS" IndexNonEncoding = "index.nonEncoding" + EnableDynamicSchemaKey = `dynamicfield.enabled` ) const ( @@ -476,6 +477,16 @@ func GetReplicateEndTS(kvs []*commonpb.KeyValuePair) (uint64, bool) { return 0, false } +func IsEnableDynamicSchema(kvs []*commonpb.KeyValuePair) (found bool, value bool, err error) { + for _, kv := range kvs { + if kv.GetKey() == EnableDynamicSchemaKey { + value, err = strconv.ParseBool(kv.GetValue()) + return true, value, err + } + } + return false, false, nil +} + func ValidateAutoIndexMmapConfig(autoIndexConfigEnable, isVectorField bool, indexParams map[string]string) error { if !autoIndexConfigEnable { return nil diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index 422c020589..868090b659 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -258,3 +258,33 @@ func TestReplicateProperty(t *testing.T) { } }) } + +func TestIsEnableDynamicSchema(t *testing.T) { + type testCase struct { + tag string + input []*commonpb.KeyValuePair + expectFound bool + expectValue bool + expectError bool + } + + cases := []testCase{ + {tag: "no_params", expectFound: false}, + {tag: "dynamicfield_true", input: []*commonpb.KeyValuePair{{Key: EnableDynamicSchemaKey, Value: "true"}}, expectFound: true, expectValue: true}, + {tag: "dynamicfield_false", input: []*commonpb.KeyValuePair{{Key: EnableDynamicSchemaKey, Value: "false"}}, expectFound: true, expectValue: false}, + {tag: "bad_kv_value", input: []*commonpb.KeyValuePair{{Key: EnableDynamicSchemaKey, Value: "abc"}}, expectFound: true, expectError: true}, + } + + for _, tc := range cases { + t.Run(tc.tag, func(t *testing.T) { + found, value, err := IsEnableDynamicSchema(tc.input) + if tc.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tc.expectFound, found) + assert.Equal(t, tc.expectValue, value) + }) + } +}