mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
feat: Support enabling dynamic schema on existing collection (#44151)
Related to #44150 This PR make enabling `dynamic schema` feature for an existing collection possible. This related API is to reuse `AlterCollection` and underhood its redirected to `adding nullable json field` --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
005fb467f7
commit
aa4ef9c996
@ -523,6 +523,17 @@ SegmentInternalInterface::bulk_subscript_not_exist_field(
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
// for enabling dynamic field, normal json not support default value yet
|
||||||
|
case DataType::JSON: {
|
||||||
|
auto data_ptr = result->mutable_scalars()
|
||||||
|
->mutable_json_data()
|
||||||
|
->mutable_data();
|
||||||
|
|
||||||
|
for (int64_t i = 0; i < count; ++i) {
|
||||||
|
data_ptr->at(i) = field_meta.default_value()->bytes_data();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
default: {
|
default: {
|
||||||
ThrowInfo(DataTypeInvalid,
|
ThrowInfo(DataTypeInvalid,
|
||||||
fmt::format("unsupported default value type {}",
|
fmt::format("unsupported default value type {}",
|
||||||
|
|||||||
@ -27,7 +27,6 @@ import (
|
|||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||||
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
|
||||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -61,24 +60,15 @@ func (t *addCollectionFieldTask) Execute(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// assign field id
|
// assign field id
|
||||||
t.fieldSchema.FieldID = t.nextFieldID(oldColl)
|
t.fieldSchema.FieldID = nextFieldID(oldColl)
|
||||||
|
|
||||||
newField := model.UnmarshalFieldModel(t.fieldSchema)
|
newField := model.UnmarshalFieldModel(t.fieldSchema)
|
||||||
|
|
||||||
ts := t.GetTs()
|
ts := t.GetTs()
|
||||||
|
t.Req.CollectionID = oldColl.CollectionID
|
||||||
return executeAddCollectionFieldTaskSteps(ctx, t.core, oldColl, newField, t.Req, ts)
|
return executeAddCollectionFieldTaskSteps(ctx, t.core, oldColl, newField, t.Req, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *addCollectionFieldTask) nextFieldID(coll *model.Collection) int64 {
|
|
||||||
maxFieldID := int64(common.StartOfUserFieldID)
|
|
||||||
for _, field := range coll.Fields {
|
|
||||||
if field.FieldID > maxFieldID {
|
|
||||||
maxFieldID = field.FieldID
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return maxFieldID + 1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *addCollectionFieldTask) GetLockerKey() LockerKey {
|
func (t *addCollectionFieldTask) GetLockerKey() LockerKey {
|
||||||
collection := t.core.getCollectionIDStr(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), 0)
|
collection := t.core.getCollectionIDStr(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), 0)
|
||||||
return NewLockerKeyChain(
|
return NewLockerKeyChain(
|
||||||
@ -88,17 +78,26 @@ func (t *addCollectionFieldTask) GetLockerKey() LockerKey {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type collInfoProvider interface {
|
||||||
|
GetDbName() string
|
||||||
|
GetCollectionName() string
|
||||||
|
GetCollectionID() int64
|
||||||
|
}
|
||||||
|
|
||||||
func executeAddCollectionFieldTaskSteps(ctx context.Context,
|
func executeAddCollectionFieldTaskSteps(ctx context.Context,
|
||||||
core *Core,
|
core *Core,
|
||||||
col *model.Collection,
|
col *model.Collection,
|
||||||
newField *model.Field,
|
newField *model.Field,
|
||||||
req *milvuspb.AddCollectionFieldRequest,
|
req collInfoProvider,
|
||||||
ts Timestamp,
|
ts Timestamp,
|
||||||
) error {
|
) error {
|
||||||
redoTask := newBaseRedoTask(core.stepExecutor)
|
redoTask := newBaseRedoTask(core.stepExecutor)
|
||||||
|
|
||||||
updatedCollection := col.Clone()
|
updatedCollection := col.Clone()
|
||||||
updatedCollection.Fields = append(updatedCollection.Fields, newField)
|
updatedCollection.Fields = append(updatedCollection.Fields, newField)
|
||||||
|
if newField.IsDynamic {
|
||||||
|
updatedCollection.EnableDynamicField = true
|
||||||
|
}
|
||||||
redoTask.AddSyncStep(&WriteSchemaChangeWALStep{
|
redoTask.AddSyncStep(&WriteSchemaChangeWALStep{
|
||||||
baseStep: baseStep{core: core},
|
baseStep: baseStep{core: core},
|
||||||
collection: updatedCollection,
|
collection: updatedCollection,
|
||||||
@ -112,7 +111,6 @@ func executeAddCollectionFieldTaskSteps(ctx context.Context,
|
|||||||
newField: newField,
|
newField: newField,
|
||||||
})
|
})
|
||||||
|
|
||||||
req.CollectionID = oldColl.CollectionID
|
|
||||||
redoTask.AddSyncStep(&BroadcastAlteredCollectionStep{
|
redoTask.AddSyncStep(&BroadcastAlteredCollectionStep{
|
||||||
baseStep: baseStep{core: core},
|
baseStep: baseStep{core: core},
|
||||||
req: &milvuspb.AlterCollectionRequest{
|
req: &milvuspb.AlterCollectionRequest{
|
||||||
|
|||||||
115
internal/rootcoord/alter_dynamic_schema_task.go
Normal file
115
internal/rootcoord/alter_dynamic_schema_task.go
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package rootcoord
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
|
)
|
||||||
|
|
||||||
|
type alterDynamicFieldTask struct {
|
||||||
|
baseTask
|
||||||
|
Req *milvuspb.AlterCollectionRequest
|
||||||
|
oldColl *model.Collection
|
||||||
|
fieldSchema *schemapb.FieldSchema
|
||||||
|
targetValue bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *alterDynamicFieldTask) Prepare(ctx context.Context) error {
|
||||||
|
if err := CheckMsgType(t.Req.GetBase().GetMsgType(), commonpb.MsgType_AlterCollection); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
oldColl, err := t.core.meta.GetCollectionByName(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), t.ts)
|
||||||
|
if err != nil {
|
||||||
|
log.Ctx(ctx).Warn("get collection failed during alter dynamic schema",
|
||||||
|
zap.String("collectionName", t.Req.GetCollectionName()), zap.Uint64("ts", t.ts))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.oldColl = oldColl
|
||||||
|
|
||||||
|
if len(t.Req.GetProperties()) > 1 {
|
||||||
|
return merr.WrapErrParameterInvalidMsg("cannot alter dynamic schema with other properties")
|
||||||
|
}
|
||||||
|
|
||||||
|
// return nil for no-op
|
||||||
|
if oldColl.EnableDynamicField == t.targetValue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// not support disabling since remove field not support yet.
|
||||||
|
if !t.targetValue {
|
||||||
|
return merr.WrapErrParameterInvalidMsg("dynamic schema cannot supported to be disabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// convert to add $meta json field, nullable, default value `{}`
|
||||||
|
t.fieldSchema = &schemapb.FieldSchema{
|
||||||
|
Name: common.MetaFieldName,
|
||||||
|
DataType: schemapb.DataType_JSON,
|
||||||
|
IsDynamic: true,
|
||||||
|
Nullable: true,
|
||||||
|
DefaultValue: &schemapb.ValueField{
|
||||||
|
Data: &schemapb.ValueField_BytesData{
|
||||||
|
BytesData: []byte("{}"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := checkFieldSchema([]*schemapb.FieldSchema{t.fieldSchema}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *alterDynamicFieldTask) Execute(ctx context.Context) error {
|
||||||
|
// return nil for no-op
|
||||||
|
if t.oldColl.EnableDynamicField == t.targetValue {
|
||||||
|
log.Info("dynamic schema is same as target value",
|
||||||
|
zap.Bool("targetValue", t.targetValue),
|
||||||
|
zap.String("collectionName", t.Req.GetCollectionName()))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// assign field id
|
||||||
|
t.fieldSchema.FieldID = nextFieldID(t.oldColl)
|
||||||
|
|
||||||
|
// currently only add dynamic field support
|
||||||
|
// TODO check target value to remove field after supported
|
||||||
|
|
||||||
|
newField := model.UnmarshalFieldModel(t.fieldSchema)
|
||||||
|
t.Req.CollectionID = t.oldColl.CollectionID
|
||||||
|
|
||||||
|
ts := t.GetTs()
|
||||||
|
return executeAddCollectionFieldTaskSteps(ctx, t.core, t.oldColl, newField, t.Req, ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *alterDynamicFieldTask) GetLockerKey() LockerKey {
|
||||||
|
collection := t.core.getCollectionIDStr(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), 0)
|
||||||
|
return NewLockerKeyChain(
|
||||||
|
NewClusterLockerKey(false),
|
||||||
|
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||||
|
NewCollectionLockerKey(collection, true),
|
||||||
|
)
|
||||||
|
}
|
||||||
251
internal/rootcoord/alter_dynamic_schema_task_test.go
Normal file
251
internal/rootcoord/alter_dynamic_schema_task_test.go
Normal file
@ -0,0 +1,251 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package rootcoord
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/distributed/streaming"
|
||||||
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||||
|
"github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming"
|
||||||
|
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AlterDynamicSchemaTaskSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
meta *mockrootcoord.IMetaTable
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AlterDynamicSchemaTaskSuite) getDisabledCollection() *model.Collection {
|
||||||
|
return &model.Collection{
|
||||||
|
CollectionID: 1,
|
||||||
|
Name: "coll_disabled",
|
||||||
|
Fields: []*model.Field{
|
||||||
|
{
|
||||||
|
Name: "pk",
|
||||||
|
FieldID: 100,
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vec",
|
||||||
|
FieldID: 101,
|
||||||
|
DataType: schemapb.DataType_FloatVector,
|
||||||
|
TypeParams: []*commonpb.KeyValuePair{
|
||||||
|
{
|
||||||
|
Key: common.DimKey,
|
||||||
|
Value: "768",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
EnableDynamicField: false,
|
||||||
|
PhysicalChannelNames: []string{"dml_ch_01", "dml_ch_02"},
|
||||||
|
VirtualChannelNames: []string{"dml_ch_01", "dml_ch_02"},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AlterDynamicSchemaTaskSuite) SetupTest() {
|
||||||
|
s.meta = mockrootcoord.NewIMetaTable(s.T())
|
||||||
|
s.meta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, "not_existed_coll", mock.Anything).Return(nil, merr.WrapErrCollectionNotFound("not_existed_coll")).Maybe()
|
||||||
|
s.meta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, "coll_disabled", mock.Anything).Return(s.getDisabledCollection(), nil).Maybe()
|
||||||
|
s.meta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, "coll_enabled", mock.Anything).Return(&model.Collection{
|
||||||
|
CollectionID: 1,
|
||||||
|
Name: "coll_enabled",
|
||||||
|
Fields: []*model.Field{
|
||||||
|
{
|
||||||
|
Name: "pk",
|
||||||
|
FieldID: 100,
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vec",
|
||||||
|
FieldID: 101,
|
||||||
|
DataType: schemapb.DataType_FloatVector,
|
||||||
|
TypeParams: []*commonpb.KeyValuePair{
|
||||||
|
{
|
||||||
|
Key: common.DimKey,
|
||||||
|
Value: "768",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "$meta",
|
||||||
|
IsDynamic: true,
|
||||||
|
DataType: schemapb.DataType_JSON,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
EnableDynamicField: true,
|
||||||
|
}, nil).Maybe()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AlterDynamicSchemaTaskSuite) TestPrepare() {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
s.Run("invalid_msg_type", func() {
|
||||||
|
task := &alterDynamicFieldTask{Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}}}
|
||||||
|
err := task.Prepare(ctx)
|
||||||
|
s.Error(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("alter_with_other_properties", func() {
|
||||||
|
task := &alterDynamicFieldTask{Req: &milvuspb.AlterCollectionRequest{
|
||||||
|
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection},
|
||||||
|
Properties: []*commonpb.KeyValuePair{
|
||||||
|
{
|
||||||
|
Key: common.EnableDynamicSchemaKey,
|
||||||
|
Value: "true",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "other_keys",
|
||||||
|
Value: "other_values",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
err := task.Prepare(ctx)
|
||||||
|
s.Error(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("disable_dynamic_field_for_disabled_coll", func() {
|
||||||
|
core := newTestCore(withMeta(s.meta))
|
||||||
|
task := &alterDynamicFieldTask{
|
||||||
|
baseTask: newBaseTask(ctx, core),
|
||||||
|
Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_disabled"},
|
||||||
|
targetValue: false,
|
||||||
|
}
|
||||||
|
err := task.Prepare(ctx)
|
||||||
|
s.NoError(err, "disabling dynamic schema on diabled collection shall be a no-op")
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("disable_dynamic_field_for_enabled_coll", func() {
|
||||||
|
core := newTestCore(withMeta(s.meta))
|
||||||
|
task := &alterDynamicFieldTask{
|
||||||
|
baseTask: newBaseTask(ctx, core),
|
||||||
|
Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_enabled"},
|
||||||
|
targetValue: false,
|
||||||
|
}
|
||||||
|
err := task.Prepare(ctx)
|
||||||
|
s.Error(err, "disabling dynamic schema on enabled collection not supported yet")
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("enable_dynamic_field_for_enabled_coll", func() {
|
||||||
|
core := newTestCore(withMeta(s.meta))
|
||||||
|
task := &alterDynamicFieldTask{
|
||||||
|
baseTask: newBaseTask(ctx, core),
|
||||||
|
Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_enabled"},
|
||||||
|
targetValue: true,
|
||||||
|
}
|
||||||
|
err := task.Prepare(ctx)
|
||||||
|
s.NoError(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("collection_not_exist", func() {
|
||||||
|
core := newTestCore(withMeta(s.meta))
|
||||||
|
task := &alterDynamicFieldTask{
|
||||||
|
baseTask: newBaseTask(ctx, core),
|
||||||
|
Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "not_existed_coll"},
|
||||||
|
targetValue: true,
|
||||||
|
}
|
||||||
|
err := task.Prepare(ctx)
|
||||||
|
s.Error(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("normal_case", func() {
|
||||||
|
core := newTestCore(withMeta(s.meta))
|
||||||
|
task := &alterDynamicFieldTask{
|
||||||
|
baseTask: newBaseTask(ctx, core),
|
||||||
|
Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_disabled"},
|
||||||
|
targetValue: true,
|
||||||
|
}
|
||||||
|
err := task.Prepare(ctx)
|
||||||
|
s.NoError(err)
|
||||||
|
s.NotNil(task.fieldSchema)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AlterDynamicSchemaTaskSuite) TestExecute() {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
s.Run("no_op", func() {
|
||||||
|
core := newTestCore(withMeta(s.meta))
|
||||||
|
task := &alterDynamicFieldTask{
|
||||||
|
baseTask: newBaseTask(ctx, core),
|
||||||
|
Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_disabled"},
|
||||||
|
oldColl: s.getDisabledCollection(),
|
||||||
|
targetValue: false,
|
||||||
|
}
|
||||||
|
err := task.Execute(ctx)
|
||||||
|
s.NoError(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("normal_case", func() {
|
||||||
|
b := mock_streaming.NewMockBroadcast(s.T())
|
||||||
|
wal := mock_streaming.NewMockWALAccesser(s.T())
|
||||||
|
wal.EXPECT().Broadcast().Return(b).Maybe()
|
||||||
|
streaming.SetWALForTest(wal)
|
||||||
|
|
||||||
|
b.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.BroadcastAppendResult{
|
||||||
|
AppendResults: map[string]*types.AppendResult{
|
||||||
|
"dml_ch_01": {TimeTick: 100},
|
||||||
|
"dml_ch_02": {TimeTick: 101},
|
||||||
|
},
|
||||||
|
}, nil).Times(1)
|
||||||
|
|
||||||
|
s.meta.EXPECT().AlterCollection(
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
).Return(nil)
|
||||||
|
s.meta.EXPECT().ListAliasesByID(mock.Anything, mock.Anything).Return([]string{})
|
||||||
|
|
||||||
|
broker := newMockBroker()
|
||||||
|
broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
alloc := newMockIDAllocator()
|
||||||
|
core := newTestCore(withValidProxyManager(), withMeta(s.meta), withBroker(broker), withIDAllocator(alloc))
|
||||||
|
|
||||||
|
task := &alterDynamicFieldTask{
|
||||||
|
baseTask: newBaseTask(ctx, core),
|
||||||
|
Req: &milvuspb.AlterCollectionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "coll_disabled"},
|
||||||
|
targetValue: true,
|
||||||
|
}
|
||||||
|
err := task.Prepare(ctx)
|
||||||
|
s.NoError(err)
|
||||||
|
err = task.Execute(ctx)
|
||||||
|
s.NoError(err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAlterDynamicSchemaTask(t *testing.T) {
|
||||||
|
suite.Run(t, new(AlterDynamicSchemaTaskSuite))
|
||||||
|
}
|
||||||
@ -1381,23 +1381,38 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection
|
|||||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.TotalLabel).Inc()
|
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.TotalLabel).Inc()
|
||||||
tr := timerecord.NewTimeRecorder("AlterCollection")
|
tr := timerecord.NewTimeRecorder("AlterCollection")
|
||||||
|
|
||||||
log.Ctx(ctx).Info("received request to alter collection",
|
log := log.Ctx(ctx).With(
|
||||||
zap.String("role", typeutil.RootCoordRole),
|
zap.String("role", typeutil.RootCoordRole),
|
||||||
zap.String("name", in.GetCollectionName()),
|
zap.String("name", in.GetCollectionName()),
|
||||||
|
)
|
||||||
|
|
||||||
|
log.Info("received request to alter collection",
|
||||||
zap.Any("props", in.Properties),
|
zap.Any("props", in.Properties),
|
||||||
zap.Any("delete_keys", in.DeleteKeys),
|
zap.Any("delete_keys", in.DeleteKeys),
|
||||||
)
|
)
|
||||||
|
|
||||||
t := &alterCollectionTask{
|
var t task
|
||||||
|
if ok, value, err := common.IsEnableDynamicSchema(in.GetProperties()); ok {
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("failed to check dynamic schema prop kv", zap.Error(err))
|
||||||
|
return merr.Status(err), nil
|
||||||
|
}
|
||||||
|
log.Info("found update dynamic schema prop kv")
|
||||||
|
t = &alterDynamicFieldTask{
|
||||||
baseTask: newBaseTask(ctx, c),
|
baseTask: newBaseTask(ctx, c),
|
||||||
Req: in,
|
Req: in,
|
||||||
|
targetValue: value,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
t = &alterCollectionTask{
|
||||||
|
baseTask: newBaseTask(ctx, c),
|
||||||
|
Req: in,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.scheduler.AddTask(t); err != nil {
|
if err := c.scheduler.AddTask(t); err != nil {
|
||||||
log.Warn("failed to enqueue request to alter collection",
|
log.Warn("failed to enqueue request to alter collection",
|
||||||
zap.String("role", typeutil.RootCoordRole),
|
zap.Error(err))
|
||||||
zap.Error(err),
|
|
||||||
zap.String("name", in.GetCollectionName()))
|
|
||||||
|
|
||||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
|
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
|
||||||
return merr.Status(err), nil
|
return merr.Status(err), nil
|
||||||
@ -1405,9 +1420,7 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection
|
|||||||
|
|
||||||
if err := t.WaitToFinish(); err != nil {
|
if err := t.WaitToFinish(); err != nil {
|
||||||
log.Warn("failed to alter collection",
|
log.Warn("failed to alter collection",
|
||||||
zap.String("role", typeutil.RootCoordRole),
|
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.String("name", in.GetCollectionName()),
|
|
||||||
zap.Uint64("ts", t.GetTs()))
|
zap.Uint64("ts", t.GetTs()))
|
||||||
|
|
||||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
|
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
|
||||||
@ -1416,11 +1429,9 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection
|
|||||||
|
|
||||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc()
|
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc()
|
||||||
metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
|
metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||||
metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterCollection").Observe(float64(t.queueDur.Milliseconds()))
|
metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterCollection").Observe(float64(t.GetDurationInQueue().Milliseconds()))
|
||||||
|
|
||||||
log.Info("done to alter collection",
|
log.Info("done to alter collection",
|
||||||
zap.String("role", typeutil.RootCoordRole),
|
|
||||||
zap.String("name", in.GetCollectionName()),
|
|
||||||
zap.Uint64("ts", t.GetTs()))
|
zap.Uint64("ts", t.GetTs()))
|
||||||
return merr.Success(), nil
|
return merr.Success(), nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -34,6 +34,7 @@ import (
|
|||||||
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
||||||
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
|
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
|
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/proto/proxypb"
|
"github.com/milvus-io/milvus/pkg/v2/proto/proxypb"
|
||||||
@ -1516,6 +1517,32 @@ func TestRootCoord_AlterCollection(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("set_dynamic_field_bad_request", func(t *testing.T) {
|
||||||
|
c := newTestCore(withHealthyCode(),
|
||||||
|
withValidScheduler())
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
resp, err := c.AlterCollection(ctx, &milvuspb.AlterCollectionRequest{
|
||||||
|
Properties: []*commonpb.KeyValuePair{
|
||||||
|
{Key: common.EnableDynamicSchemaKey, Value: "abc"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.Error(t, merr.CheckRPCCall(resp, err))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("set_dynamic_field_ok", func(t *testing.T) {
|
||||||
|
c := newTestCore(withHealthyCode(),
|
||||||
|
withValidScheduler())
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
resp, err := c.AlterCollection(ctx, &milvuspb.AlterCollectionRequest{
|
||||||
|
Properties: []*commonpb.KeyValuePair{
|
||||||
|
{Key: common.EnableDynamicSchemaKey, Value: "true"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.NoError(t, merr.CheckRPCCall(resp, err))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRootCoord_CheckHealth(t *testing.T) {
|
func TestRootCoord_CheckHealth(t *testing.T) {
|
||||||
|
|||||||
@ -56,6 +56,7 @@ type task interface {
|
|||||||
NotifyDone(err error)
|
NotifyDone(err error)
|
||||||
IsFinished() bool
|
IsFinished() bool
|
||||||
SetInQueueDuration()
|
SetInQueueDuration()
|
||||||
|
GetDurationInQueue() time.Duration
|
||||||
GetLockerKey() LockerKey
|
GetLockerKey() LockerKey
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,6 +128,10 @@ func (b *baseTask) SetInQueueDuration() {
|
|||||||
b.queueDur = b.tr.ElapseSpan()
|
b.queueDur = b.tr.ElapseSpan()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *baseTask) GetDurationInQueue() time.Duration {
|
||||||
|
return b.queueDur
|
||||||
|
}
|
||||||
|
|
||||||
func (b *baseTask) IsFinished() bool {
|
func (b *baseTask) IsFinished() bool {
|
||||||
return b.isFinished.Load()
|
return b.isFinished.Load()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/json"
|
"github.com/milvus-io/milvus/internal/json"
|
||||||
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||||
"github.com/milvus-io/milvus/internal/types"
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||||
@ -387,7 +388,11 @@ func checkFieldSchema(fieldSchemas []*schemapb.FieldSchema) error {
|
|||||||
return merr.WrapErrParameterInvalidMsg(msg)
|
return merr.WrapErrParameterInvalidMsg(msg)
|
||||||
}
|
}
|
||||||
dtype := fieldSchema.GetDataType()
|
dtype := fieldSchema.GetDataType()
|
||||||
if dtype == schemapb.DataType_Array || dtype == schemapb.DataType_JSON || typeutil.IsVectorType(dtype) {
|
if dtype == schemapb.DataType_Array || typeutil.IsVectorType(dtype) {
|
||||||
|
msg := fmt.Sprintf("type not support default_value, type:%s, name:%s", fieldSchema.GetDataType().String(), fieldSchema.GetName())
|
||||||
|
return merr.WrapErrParameterInvalidMsg(msg)
|
||||||
|
}
|
||||||
|
if dtype == schemapb.DataType_JSON && !fieldSchema.IsDynamic {
|
||||||
msg := fmt.Sprintf("type not support default_value, type:%s, name:%s", fieldSchema.GetDataType().String(), fieldSchema.GetName())
|
msg := fmt.Sprintf("type not support default_value, type:%s, name:%s", fieldSchema.GetDataType().String(), fieldSchema.GetName())
|
||||||
return merr.WrapErrParameterInvalidMsg(msg)
|
return merr.WrapErrParameterInvalidMsg(msg)
|
||||||
}
|
}
|
||||||
@ -444,6 +449,19 @@ func checkFieldSchema(fieldSchemas []*schemapb.FieldSchema) error {
|
|||||||
msg := fmt.Sprintf("the length (%d) of string exceeds max length (%d)", defaultValueLength, maxLength)
|
msg := fmt.Sprintf("the length (%d) of string exceeds max length (%d)", defaultValueLength, maxLength)
|
||||||
return merr.WrapErrParameterInvalid("valid length string", "string length exceeds max length", msg)
|
return merr.WrapErrParameterInvalid("valid length string", "string length exceeds max length", msg)
|
||||||
}
|
}
|
||||||
|
case *schemapb.ValueField_BytesData:
|
||||||
|
if dtype != schemapb.DataType_JSON {
|
||||||
|
return errTypeMismatch(fieldSchema.GetName(), dtype.String(), "DataType_SJON")
|
||||||
|
}
|
||||||
|
defVal := fieldSchema.GetDefaultValue().GetBytesData()
|
||||||
|
jsonData := make(map[string]interface{})
|
||||||
|
if err := json.Unmarshal(defVal, &jsonData); err != nil {
|
||||||
|
log.Info("invalid default json value, milvus only support json map",
|
||||||
|
zap.ByteString("data", defVal),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
return merr.WrapErrParameterInvalidMsg(err.Error())
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
panic("default value unsupport data type")
|
panic("default value unsupport data type")
|
||||||
}
|
}
|
||||||
@ -537,3 +555,13 @@ func validateStructArrayFieldDataType(fieldSchemas []*schemapb.StructArrayFieldS
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func nextFieldID(coll *model.Collection) int64 {
|
||||||
|
maxFieldID := int64(common.StartOfUserFieldID)
|
||||||
|
for _, field := range coll.Fields {
|
||||||
|
if field.FieldID > maxFieldID {
|
||||||
|
maxFieldID = field.FieldID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return maxFieldID + 1
|
||||||
|
}
|
||||||
|
|||||||
@ -229,6 +229,7 @@ const (
|
|||||||
ReplicateIDKey = "replicate.id"
|
ReplicateIDKey = "replicate.id"
|
||||||
ReplicateEndTSKey = "replicate.endTS"
|
ReplicateEndTSKey = "replicate.endTS"
|
||||||
IndexNonEncoding = "index.nonEncoding"
|
IndexNonEncoding = "index.nonEncoding"
|
||||||
|
EnableDynamicSchemaKey = `dynamicfield.enabled`
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -476,6 +477,16 @@ func GetReplicateEndTS(kvs []*commonpb.KeyValuePair) (uint64, bool) {
|
|||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func IsEnableDynamicSchema(kvs []*commonpb.KeyValuePair) (found bool, value bool, err error) {
|
||||||
|
for _, kv := range kvs {
|
||||||
|
if kv.GetKey() == EnableDynamicSchemaKey {
|
||||||
|
value, err = strconv.ParseBool(kv.GetValue())
|
||||||
|
return true, value, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
func ValidateAutoIndexMmapConfig(autoIndexConfigEnable, isVectorField bool, indexParams map[string]string) error {
|
func ValidateAutoIndexMmapConfig(autoIndexConfigEnable, isVectorField bool, indexParams map[string]string) error {
|
||||||
if !autoIndexConfigEnable {
|
if !autoIndexConfigEnable {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -258,3 +258,33 @@ func TestReplicateProperty(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIsEnableDynamicSchema(t *testing.T) {
|
||||||
|
type testCase struct {
|
||||||
|
tag string
|
||||||
|
input []*commonpb.KeyValuePair
|
||||||
|
expectFound bool
|
||||||
|
expectValue bool
|
||||||
|
expectError bool
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []testCase{
|
||||||
|
{tag: "no_params", expectFound: false},
|
||||||
|
{tag: "dynamicfield_true", input: []*commonpb.KeyValuePair{{Key: EnableDynamicSchemaKey, Value: "true"}}, expectFound: true, expectValue: true},
|
||||||
|
{tag: "dynamicfield_false", input: []*commonpb.KeyValuePair{{Key: EnableDynamicSchemaKey, Value: "false"}}, expectFound: true, expectValue: false},
|
||||||
|
{tag: "bad_kv_value", input: []*commonpb.KeyValuePair{{Key: EnableDynamicSchemaKey, Value: "abc"}}, expectFound: true, expectError: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.tag, func(t *testing.T) {
|
||||||
|
found, value, err := IsEnableDynamicSchema(tc.input)
|
||||||
|
if tc.expectError {
|
||||||
|
assert.Error(t, err)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
assert.Equal(t, tc.expectFound, found)
|
||||||
|
assert.Equal(t, tc.expectValue, value)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user