From 104d0966b7df80a251437bed74f28fa47e138397 Mon Sep 17 00:00:00 2001 From: Patrick Weizhi Xu Date: Thu, 11 Jul 2024 19:01:35 +0800 Subject: [PATCH] feat: support partition key isolation (#34336) issue: #34332 --------- Signed-off-by: Patrick Weizhi Xu --- internal/core/src/index/VectorDiskIndex.cpp | 4 + internal/core/src/indexbuilder/index_c.cpp | 3 + internal/datacoord/task_index.go | 92 ++++---- internal/datacoord/task_scheduler_test.go | 106 +++++++++ internal/indexnode/task_index.go | 78 +++--- internal/proto/index_cgo_msg.proto | 1 + internal/proto/index_coord.proto | 1 + internal/proxy/impl.go | 1 + internal/proxy/meta_cache.go | 52 ++-- internal/proxy/rootcoord_mock_test.go | 3 + internal/proxy/task.go | 88 +++++++ internal/proxy/task_search.go | 31 ++- internal/proxy/task_search_test.go | 34 ++- internal/proxy/task_test.go | 223 +++++++++++++++++- internal/rootcoord/alter_collection_task.go | 11 + .../rootcoord/alter_collection_task_test.go | 56 ++++- internal/util/exprutil/expr_checker.go | 92 ++++++++ internal/util/exprutil/expr_checker_test.go | 207 ++++++++++++++++ pkg/common/common.go | 31 ++- pkg/common/common_test.go | 61 +++++ 20 files changed, 1063 insertions(+), 112 deletions(-) diff --git a/internal/core/src/index/VectorDiskIndex.cpp b/internal/core/src/index/VectorDiskIndex.cpp index 29ce43b99d..b9e35b7b4c 100644 --- a/internal/core/src/index/VectorDiskIndex.cpp +++ b/internal/core/src/index/VectorDiskIndex.cpp @@ -217,6 +217,8 @@ VectorDiskAnnIndex::BuildV2(const Config& config) { if (opt_fields.has_value() && index_.IsAdditionalScalarSupported()) { build_config[VEC_OPT_FIELDS_PATH] = file_manager_->CacheOptFieldToDisk(opt_fields.value()); + // `partition_key_isolation` is already in the config, so it falls through + // into the index Build call directly } build_config.erase("insert_files"); @@ -264,6 +266,8 @@ VectorDiskAnnIndex::Build(const Config& config) { if (opt_fields.has_value() && index_.IsAdditionalScalarSupported()) { build_config[VEC_OPT_FIELDS_PATH] = file_manager_->CacheOptFieldToDisk(opt_fields.value()); + // `partition_key_isolation` is already in the config, so it falls through + // into the index Build call directly } build_config.erase("insert_files"); diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index 84f781e589..48e461fd01 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -142,6 +142,9 @@ get_config(std::unique_ptr& info) { if (info->opt_fields().size()) { config["opt_fields"] = get_opt_field(info->opt_fields()); } + if (info->partition_key_isolation()) { + config["partition_key_isolation"] = info->partition_key_isolation(); + } return config; } diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index effefe00ee..eb9c207204 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -109,6 +109,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule } // vector index build needs information of optional scalar fields data optionalFields := make([]*indexpb.OptionalFieldInfo, 0) + partitionKeyIsolation := false if Params.CommonCfg.EnableMaterializedView.GetAsBool() && isOptionalScalarFieldSupported(indexType) { collInfo, err := dependency.handler.GetCollection(ctx, segIndex.CollectionID) if err != nil || collInfo == nil { @@ -128,6 +129,13 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule FieldType: int32(partitionKeyField.DataType), DataIds: getBinLogIDs(segment, partitionKeyField.FieldID), }) + iso, isoErr := common.IsPartitionKeyIsolationPropEnabled(collInfo.Properties) + if isoErr != nil { + log.Ctx(ctx).Warn("failed to parse partition key isolation", zap.Error(isoErr)) + } + if iso { + partitionKeyIsolation = true + } } } } @@ -209,50 +217,52 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule } it.req = &indexpb.CreateJobRequest{ - ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), - IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath), - BuildID: it.taskID, - IndexVersion: segIndex.IndexVersion + 1, - StorageConfig: storageConfig, - IndexParams: indexParams, - TypeParams: typeParams, - NumRows: segIndex.NumRows, - CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(), - CollectionID: segment.GetCollectionID(), - PartitionID: segment.GetPartitionID(), - SegmentID: segment.GetID(), - FieldID: fieldID, - FieldName: field.GetName(), - FieldType: field.GetDataType(), - StorePath: storePath, - StoreVersion: segment.GetStorageVersion(), - IndexStorePath: indexStorePath, - Dim: int64(dim), - DataIds: binlogIDs, - OptionalScalarFields: optionalFields, - Field: field, + ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), + IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath), + BuildID: it.taskID, + IndexVersion: segIndex.IndexVersion + 1, + StorageConfig: storageConfig, + IndexParams: indexParams, + TypeParams: typeParams, + NumRows: segIndex.NumRows, + CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(), + CollectionID: segment.GetCollectionID(), + PartitionID: segment.GetPartitionID(), + SegmentID: segment.GetID(), + FieldID: fieldID, + FieldName: field.GetName(), + FieldType: field.GetDataType(), + StorePath: storePath, + StoreVersion: segment.GetStorageVersion(), + IndexStorePath: indexStorePath, + Dim: int64(dim), + DataIds: binlogIDs, + OptionalScalarFields: optionalFields, + Field: field, + PartitionKeyIsolation: partitionKeyIsolation, } } else { it.req = &indexpb.CreateJobRequest{ - ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), - IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath), - BuildID: it.taskID, - IndexVersion: segIndex.IndexVersion + 1, - StorageConfig: storageConfig, - IndexParams: indexParams, - TypeParams: typeParams, - NumRows: segIndex.NumRows, - CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(), - CollectionID: segment.GetCollectionID(), - PartitionID: segment.GetPartitionID(), - SegmentID: segment.GetID(), - FieldID: fieldID, - FieldName: field.GetName(), - FieldType: field.GetDataType(), - Dim: int64(dim), - DataIds: binlogIDs, - OptionalScalarFields: optionalFields, - Field: field, + ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), + IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath), + BuildID: it.taskID, + IndexVersion: segIndex.IndexVersion + 1, + StorageConfig: storageConfig, + IndexParams: indexParams, + TypeParams: typeParams, + NumRows: segIndex.NumRows, + CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(), + CollectionID: segment.GetCollectionID(), + PartitionID: segment.GetPartitionID(), + SegmentID: segment.GetID(), + FieldID: fieldID, + FieldName: field.GetName(), + FieldType: field.GetDataType(), + Dim: int64(dim), + DataIds: binlogIDs, + OptionalScalarFields: optionalFields, + Field: field, + PartitionKeyIsolation: partitionKeyIsolation, } } diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index 760fbace1a..c92bba778a 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -1749,5 +1749,111 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { waitTaskDoneFunc(scheduler) resetMetaFunc() }) + + s.Run("enqueue partitionKeyIsolation is false when schema is not set", func() { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { + s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false) + return merr.Success(), nil + }).Once() + t := &indexBuildTask{ + taskID: buildID, + nodeID: nodeID, + taskInfo: &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Unissued, + FailReason: "", + }, + } + scheduler.enqueue(t) + waitTaskDoneFunc(scheduler) + resetMetaFunc() + }) scheduler.Stop() + + isoCollInfo := &collectionInfo{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Name: "coll", + Fields: fieldsSchema, + EnableDynamicField: false, + }, + Properties: map[string]string{ + common.PartitionKeyIsolationKey: "false", + }, + } + handler_isolation := NewNMockHandler(s.T()) + handler_isolation.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(isoCollInfo, nil) + + scheduler_isolation := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler_isolation) + scheduler_isolation.Start() + + s.Run("enqueue partitionKeyIsolation is false when MV not enabled", func() { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { + s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false) + return merr.Success(), nil + }).Once() + t := &indexBuildTask{ + taskID: buildID, + nodeID: nodeID, + taskInfo: &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Unissued, + FailReason: "", + }, + } + scheduler_isolation.enqueue(t) + waitTaskDoneFunc(scheduler_isolation) + resetMetaFunc() + }) + + s.Run("enqueue partitionKeyIsolation is true when MV enabled", func() { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + isoCollInfo.Properties[common.PartitionKeyIsolationKey] = "true" + in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { + s.Equal(in.GetIndexRequest().PartitionKeyIsolation, true) + return merr.Success(), nil + }).Once() + t := &indexBuildTask{ + taskID: buildID, + nodeID: nodeID, + taskInfo: &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Unissued, + FailReason: "", + }, + } + scheduler_isolation.enqueue(t) + waitTaskDoneFunc(scheduler_isolation) + resetMetaFunc() + }) + + s.Run("enqueue partitionKeyIsolation is invalid when MV is enabled", func() { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + isoCollInfo.Properties[common.PartitionKeyIsolationKey] = "invalid" + in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { + s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false) + return merr.Success(), nil + }).Once() + t := &indexBuildTask{ + taskID: buildID, + nodeID: nodeID, + taskInfo: &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Unissued, + FailReason: "", + }, + } + scheduler_isolation.enqueue(t) + waitTaskDoneFunc(scheduler_isolation) + resetMetaFunc() + }) + scheduler_isolation.Stop() } diff --git a/internal/indexnode/task_index.go b/internal/indexnode/task_index.go index c3f94a0ce7..c650e0cbf4 100644 --- a/internal/indexnode/task_index.go +++ b/internal/indexnode/task_index.go @@ -149,25 +149,26 @@ func (it *indexBuildTaskV2) Execute(ctx context.Context) error { } buildIndexParams := &indexcgopb.BuildIndexInfo{ - ClusterID: it.req.GetClusterID(), - BuildID: it.req.GetBuildID(), - CollectionID: it.req.GetCollectionID(), - PartitionID: it.req.GetPartitionID(), - SegmentID: it.req.GetSegmentID(), - IndexVersion: it.req.GetIndexVersion(), - CurrentIndexVersion: it.req.GetCurrentIndexVersion(), - NumRows: it.req.GetNumRows(), - Dim: it.req.GetDim(), - IndexFilePrefix: it.req.GetIndexFilePrefix(), - InsertFiles: it.req.GetDataPaths(), - FieldSchema: it.req.GetField(), - StorageConfig: storageConfig, - IndexParams: mapToKVPairs(it.newIndexParams), - TypeParams: mapToKVPairs(it.newTypeParams), - StorePath: it.req.GetStorePath(), - StoreVersion: it.req.GetStoreVersion(), - IndexStorePath: it.req.GetIndexStorePath(), - OptFields: optFields, + ClusterID: it.req.GetClusterID(), + BuildID: it.req.GetBuildID(), + CollectionID: it.req.GetCollectionID(), + PartitionID: it.req.GetPartitionID(), + SegmentID: it.req.GetSegmentID(), + IndexVersion: it.req.GetIndexVersion(), + CurrentIndexVersion: it.req.GetCurrentIndexVersion(), + NumRows: it.req.GetNumRows(), + Dim: it.req.GetDim(), + IndexFilePrefix: it.req.GetIndexFilePrefix(), + InsertFiles: it.req.GetDataPaths(), + FieldSchema: it.req.GetField(), + StorageConfig: storageConfig, + IndexParams: mapToKVPairs(it.newIndexParams), + TypeParams: mapToKVPairs(it.newTypeParams), + StorePath: it.req.GetStorePath(), + StoreVersion: it.req.GetStoreVersion(), + IndexStorePath: it.req.GetIndexStorePath(), + OptFields: optFields, + PartitionKeyIsolation: it.req.GetPartitionKeyIsolation(), } var err error @@ -451,25 +452,26 @@ func (it *indexBuildTask) Execute(ctx context.Context) error { } buildIndexParams := &indexcgopb.BuildIndexInfo{ - ClusterID: it.req.GetClusterID(), - BuildID: it.req.GetBuildID(), - CollectionID: it.req.GetCollectionID(), - PartitionID: it.req.GetPartitionID(), - SegmentID: it.req.GetSegmentID(), - IndexVersion: it.req.GetIndexVersion(), - CurrentIndexVersion: it.req.GetCurrentIndexVersion(), - NumRows: it.req.GetNumRows(), - Dim: it.req.GetDim(), - IndexFilePrefix: it.req.GetIndexFilePrefix(), - InsertFiles: it.req.GetDataPaths(), - FieldSchema: it.req.GetField(), - StorageConfig: storageConfig, - IndexParams: mapToKVPairs(it.newIndexParams), - TypeParams: mapToKVPairs(it.newTypeParams), - StorePath: it.req.GetStorePath(), - StoreVersion: it.req.GetStoreVersion(), - IndexStorePath: it.req.GetIndexStorePath(), - OptFields: optFields, + ClusterID: it.req.GetClusterID(), + BuildID: it.req.GetBuildID(), + CollectionID: it.req.GetCollectionID(), + PartitionID: it.req.GetPartitionID(), + SegmentID: it.req.GetSegmentID(), + IndexVersion: it.req.GetIndexVersion(), + CurrentIndexVersion: it.req.GetCurrentIndexVersion(), + NumRows: it.req.GetNumRows(), + Dim: it.req.GetDim(), + IndexFilePrefix: it.req.GetIndexFilePrefix(), + InsertFiles: it.req.GetDataPaths(), + FieldSchema: it.req.GetField(), + StorageConfig: storageConfig, + IndexParams: mapToKVPairs(it.newIndexParams), + TypeParams: mapToKVPairs(it.newTypeParams), + StorePath: it.req.GetStorePath(), + StoreVersion: it.req.GetStoreVersion(), + IndexStorePath: it.req.GetIndexStorePath(), + OptFields: optFields, + PartitionKeyIsolation: it.req.GetPartitionKeyIsolation(), } log.Info("debug create index", zap.Any("buildIndexParams", buildIndexParams)) diff --git a/internal/proto/index_cgo_msg.proto b/internal/proto/index_cgo_msg.proto index 688f871f55..1808546166 100644 --- a/internal/proto/index_cgo_msg.proto +++ b/internal/proto/index_cgo_msg.proto @@ -79,4 +79,5 @@ message BuildIndexInfo { int64 store_version = 17; string index_store_path = 18; repeated OptionalFieldInfo opt_fields = 19; + bool partition_key_isolation = 20; } diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 0c331f8bba..21b94e9541 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -287,6 +287,7 @@ message CreateJobRequest { repeated int64 data_ids = 23; repeated OptionalFieldInfo optional_scalar_fields = 24; schema.FieldSchema field = 25; + bool partition_key_isolation = 26; } message QueryJobsRequest { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 1a5791d4ca..447629b06d 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1235,6 +1235,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC AlterCollectionRequest: request, rootCoord: node.rootCoord, queryCoord: node.queryCoord, + dataCoord: node.dataCoord, } log := log.Ctx(ctx).With( diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index b9d9b4a1bc..4b87c4064f 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -95,19 +95,21 @@ type Cache interface { AllocID(ctx context.Context) (int64, error) } type collectionBasicInfo struct { - collID typeutil.UniqueID - createdTimestamp uint64 - createdUtcTimestamp uint64 - consistencyLevel commonpb.ConsistencyLevel + collID typeutil.UniqueID + createdTimestamp uint64 + createdUtcTimestamp uint64 + consistencyLevel commonpb.ConsistencyLevel + partitionKeyIsolation bool } type collectionInfo struct { - collID typeutil.UniqueID - schema *schemaInfo - partInfo *partitionInfos - createdTimestamp uint64 - createdUtcTimestamp uint64 - consistencyLevel commonpb.ConsistencyLevel + collID typeutil.UniqueID + schema *schemaInfo + partInfo *partitionInfos + createdTimestamp uint64 + createdUtcTimestamp uint64 + consistencyLevel commonpb.ConsistencyLevel + partitionKeyIsolation bool } type databaseInfo struct { @@ -185,10 +187,11 @@ type partitionInfo struct { func (info *collectionInfo) getBasicInfo() *collectionBasicInfo { // Do a deep copy for all fields. basicInfo := &collectionBasicInfo{ - collID: info.collID, - createdTimestamp: info.createdTimestamp, - createdUtcTimestamp: info.createdUtcTimestamp, - consistencyLevel: info.consistencyLevel, + collID: info.collID, + createdTimestamp: info.createdTimestamp, + createdUtcTimestamp: info.createdUtcTimestamp, + consistencyLevel: info.consistencyLevel, + partitionKeyIsolation: info.partitionKeyIsolation, } return basicInfo @@ -385,14 +388,20 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string, m.collInfo[database] = make(map[string]*collectionInfo) } + isolation, err := common.IsPartitionKeyIsolationKvEnabled(collection.Properties...) + if err != nil { + return nil, err + } + schemaInfo := newSchemaInfo(collection.Schema) m.collInfo[database][collectionName] = &collectionInfo{ - collID: collection.CollectionID, - schema: schemaInfo, - partInfo: parsePartitionsInfo(infos, schemaInfo.hasPartitionKeyField), - createdTimestamp: collection.CreatedTimestamp, - createdUtcTimestamp: collection.CreatedUtcTimestamp, - consistencyLevel: collection.ConsistencyLevel, + collID: collection.CollectionID, + schema: schemaInfo, + partInfo: parsePartitionsInfo(infos, schemaInfo.hasPartitionKeyField), + createdTimestamp: collection.CreatedTimestamp, + createdUtcTimestamp: collection.CreatedUtcTimestamp, + consistencyLevel: collection.ConsistencyLevel, + partitionKeyIsolation: isolation, } log.Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName), zap.Int64("collectionID", collection.CollectionID)) @@ -714,6 +723,7 @@ func (m *MetaCache) describeCollection(ctx context.Context, database, collection CreatedUtcTimestamp: coll.CreatedUtcTimestamp, ConsistencyLevel: coll.ConsistencyLevel, DbName: coll.GetDbName(), + Properties: coll.Properties, } for _, field := range coll.Schema.Fields { if field.FieldID >= common.StartOfUserFieldID { @@ -795,7 +805,7 @@ func parsePartitionsInfo(infos []*partitionInfo, hasPartitionKey bool) *partitio } index, err := strconv.ParseInt(splits[len(splits)-1], 10, 64) if err != nil { - log.Info("partition group not in partitionKey pattern", zap.String("parititonName", partitionName), zap.Error(err)) + log.Info("partition group not in partitionKey pattern", zap.String("partitionName", partitionName), zap.Error(err)) return result } partitionNames[index] = partitionName diff --git a/internal/proxy/rootcoord_mock_test.go b/internal/proxy/rootcoord_mock_test.go index 4d6d4a4392..97f466f2e9 100644 --- a/internal/proxy/rootcoord_mock_test.go +++ b/internal/proxy/rootcoord_mock_test.go @@ -51,6 +51,7 @@ type collectionMeta struct { physicalChannelNames []string createdTimestamp uint64 createdUtcTimestamp uint64 + properties []*commonpb.KeyValuePair } type partitionMeta struct { @@ -385,6 +386,7 @@ func (coord *RootCoordMock) CreateCollection(ctx context.Context, req *milvuspb. physicalChannelNames: physicalChannelNames, createdTimestamp: ts, createdUtcTimestamp: ts, + properties: req.GetProperties(), } coord.partitionMtx.Lock() @@ -528,6 +530,7 @@ func (coord *RootCoordMock) DescribeCollection(ctx context.Context, req *milvusp PhysicalChannelNames: meta.physicalChannelNames, CreatedTimestamp: meta.createdUtcTimestamp, CreatedUtcTimestamp: meta.createdUtcTimestamp, + Properties: meta.properties, }, nil } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index f796e8f622..004a0b4731 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -331,6 +331,11 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error { return err } + hasPartitionKey := hasParitionKeyModeField(t.schema) + if _, err := validatePartitionKeyIsolation(t.CollectionName, hasPartitionKey, t.GetProperties()...); err != nil { + return err + } + // validate clustering key if err := t.validateClusteringKey(); err != nil { return err @@ -841,6 +846,7 @@ type alterCollectionTask struct { rootCoord types.RootCoordClient result *commonpb.Status queryCoord types.QueryCoordClient + dataCoord types.DataCoordClient } func (t *alterCollectionTask) TraceCtx() context.Context { @@ -900,6 +906,32 @@ func hasLazyLoadProp(props ...*commonpb.KeyValuePair) bool { return false } +func validatePartitionKeyIsolation(colName string, isPartitionKeyEnabled bool, props ...*commonpb.KeyValuePair) (bool, error) { + iso, err := common.IsPartitionKeyIsolationKvEnabled(props...) + if err != nil { + return false, err + } + + // partition key isolation is not set, skip + if !iso { + return false, nil + } + + if !isPartitionKeyEnabled { + return false, merr.WrapErrCollectionIllegalSchema(colName, + "partition key isolation mode is enabled but no partition key field is set. Please set the partition key first") + } + + if !paramtable.Get().CommonCfg.EnableMaterializedView.GetAsBool() { + return false, merr.WrapErrCollectionIllegalSchema(colName, + "partition key isolation mode is enabled but current Milvus does not support it. Please contact us") + } + + log.Info("validated with partition key isolation", zap.String("collectionName", colName)) + + return true, nil +} + func (t *alterCollectionTask) PreExecute(ctx context.Context) error { t.Base.MsgType = commonpb.MsgType_AlterCollection t.Base.SourceID = paramtable.GetNodeID() @@ -920,6 +952,62 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error { } } + isPartitionKeyMode, err := isPartitionKeyMode(ctx, t.GetDbName(), t.CollectionName) + if err != nil { + return err + } + // check if the new partition key isolation is valid to use + newIsoValue, err := validatePartitionKeyIsolation(t.CollectionName, isPartitionKeyMode, t.Properties...) + if err != nil { + return err + } + collBasicInfo, err := globalMetaCache.GetCollectionInfo(t.ctx, t.GetDbName(), t.CollectionName, t.CollectionID) + if err != nil { + return err + } + oldIsoValue := collBasicInfo.partitionKeyIsolation + + log.Info("alter collection pre check with partition key isolation", + zap.String("collectionName", t.CollectionName), + zap.Bool("isPartitionKeyMode", isPartitionKeyMode), + zap.Bool("newIsoValue", newIsoValue), + zap.Bool("oldIsoValue", oldIsoValue)) + + // if the isolation flag in properties is not set, meta cache will assign partitionKeyIsolation in collection info to false + // - None|false -> false, skip + // - None|false -> true, check if the collection has vector index + // - true -> false, check if the collection has vector index + // - false -> true, check if the collection has vector index + // - true -> true, skip + if oldIsoValue != newIsoValue { + collSchema, err := globalMetaCache.GetCollectionSchema(ctx, t.GetDbName(), t.CollectionName) + if err != nil { + return err + } + + hasVecIndex := false + indexName := "" + indexResponse, err := t.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ + CollectionID: t.CollectionID, + IndexName: "", + }) + if err != nil { + return merr.WrapErrServiceInternal("describe index failed", err.Error()) + } + for _, index := range indexResponse.IndexInfos { + for _, field := range collSchema.Fields { + if index.FieldID == field.FieldID && typeutil.IsVectorType(field.DataType) { + hasVecIndex = true + indexName = field.GetName() + } + } + } + if hasVecIndex { + return merr.WrapErrIndexDuplicate(indexName, + "can not alter partition key isolation mode if the collection already has a vector index. Please drop the index first") + } + } + return nil } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index e0ba357950..45a2af64d6 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -297,7 +297,7 @@ func (t *searchTask) checkNq(ctx context.Context) (int64, error) { return nq, nil } -func setQueryInfoIfMvEnable(queryInfo *planpb.QueryInfo, t *searchTask) error { +func setQueryInfoIfMvEnable(queryInfo *planpb.QueryInfo, t *searchTask, plan *planpb.PlanNode) error { if t.enableMaterializedView { partitionKeyFieldSchema, err := typeutil.GetPartitionKeyFieldSchema(t.schema.CollectionSchema) if err != nil { @@ -305,7 +305,26 @@ func setQueryInfoIfMvEnable(queryInfo *planpb.QueryInfo, t *searchTask) error { return err } if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyFieldSchema) { + collInfo, colErr := globalMetaCache.GetCollectionInfo(t.ctx, t.request.GetDbName(), t.collectionName, t.CollectionID) + if colErr != nil { + log.Warn("failed to get collection info", zap.Error(colErr)) + return err + } + + if collInfo.partitionKeyIsolation { + expr, err := exprutil.ParseExprFromPlan(plan) + if err != nil { + log.Warn("failed to parse expr from plan during MV", zap.Error(err)) + return err + } + err = exprutil.ValidatePartitionKeyIsolation(expr) + if err != nil { + return err + } + } queryInfo.MaterializedViewInvolved = true + } else { + return errors.New("partition key field data type is not supported in materialized view") } } return nil @@ -350,7 +369,10 @@ func (t *searchTask) initAdvancedSearchRequest(ctx context.Context) error { if len(partitionIDs) > 0 { internalSubReq.PartitionIDs = partitionIDs t.partitionIDsSet.Upsert(partitionIDs...) - setQueryInfoIfMvEnable(queryInfo, t) + mvErr := setQueryInfoIfMvEnable(queryInfo, t, plan) + if mvErr != nil { + return mvErr + } } } else { internalSubReq.PartitionIDs = t.SearchRequest.GetPartitionIDs() @@ -406,7 +428,10 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error { } if len(partitionIDs) > 0 { t.SearchRequest.PartitionIDs = partitionIDs - setQueryInfoIfMvEnable(queryInfo, t) + mvErr := setQueryInfoIfMvEnable(queryInfo, t, plan) + if mvErr != nil { + return mvErr + } } } diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index 2397c3270a..14b1b54da2 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -2628,11 +2628,12 @@ func (s *MaterializedViewTestSuite) TearDownSuite() { func (s *MaterializedViewTestSuite) SetupTest() { s.mockMetaCache = NewMockCache(s.T()) - s.mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.colID, nil).Maybe() + s.mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.colID, nil) s.mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( &collectionBasicInfo{ - collID: s.colID, - }, nil).Maybe() + collID: s.colID, + partitionKeyIsolation: true, + }, nil) globalMetaCache = s.mockMetaCache } @@ -2731,6 +2732,33 @@ func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarChar() { s.Equal(true, task.queryInfos[0].MaterializedViewInvolved) } +func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarCharWithIsolation() { + task := s.getSearchTask() + task.enableMaterializedView = true + task.request.Dsl = testVarCharField + " == \"a\"" + schema := ConstructCollectionSchemaWithPartitionKey(s.colName, s.fieldName2Types, testInt64Field, testVarCharField, false) + schemaInfo := newSchemaInfo(schema) + s.mockMetaCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(schemaInfo, nil) + s.mockMetaCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"partition_1", "partition_2"}, nil) + s.mockMetaCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"partition_1": 1, "partition_2": 2}, nil) + err := task.PreExecute(s.ctx) + s.NoError(err) + s.NotZero(len(task.queryInfos)) + s.Equal(true, task.queryInfos[0].MaterializedViewInvolved) +} + +func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarCharWithIsolationInvalid() { + task := s.getSearchTask() + task.enableMaterializedView = true + task.request.Dsl = testVarCharField + " in [\"a\", \"b\"]" + schema := ConstructCollectionSchemaWithPartitionKey(s.colName, s.fieldName2Types, testInt64Field, testVarCharField, false) + schemaInfo := newSchemaInfo(schema) + s.mockMetaCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(schemaInfo, nil) + s.mockMetaCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"partition_1", "partition_2"}, nil) + s.mockMetaCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"partition_1": 1, "partition_2": 2}, nil) + s.ErrorContains(task.PreExecute(s.ctx), "partition key isolation does not support IN") +} + func TestMaterializedView(t *testing.T) { suite.Run(t, new(MaterializedViewTestSuite)) } diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 17644e99d9..54fda37a13 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -1033,7 +1033,7 @@ func TestHasCollectionTask(t *testing.T) { err = task.Execute(ctx) assert.NoError(t, err) assert.Equal(t, false, task.result.Value) - // createCollection in RootCood and fill GlobalMetaCache + // createIsoCollection in RootCood and fill GlobalMetaCache rc.CreateCollection(ctx, createColReq) globalMetaCache.GetCollectionID(ctx, GetCurDBNameFromContextOrDefault(ctx), collectionName) @@ -3642,3 +3642,224 @@ func TestAlterCollectionCheckLoaded(t *testing.T) { err = task.PreExecute(context.Background()) assert.Equal(t, merr.Code(merr.ErrCollectionLoaded), merr.Code(err)) } + +func TestTaskPartitionKeyIsolation(t *testing.T) { + rc := NewRootCoordMock() + defer rc.Close() + dc := NewDataCoordMock() + defer dc.Close() + qc := getQueryCoordClient() + defer qc.Close() + ctx := context.Background() + mgr := newShardClientMgr() + err := InitMetaCache(ctx, rc, qc, mgr) + assert.NoError(t, err) + shardsNum := common.DefaultShardsNum + prefix := "TestPartitionKeyIsolation" + collectionName := prefix + funcutil.GenRandomStr() + + getSchema := func(colName string, hasPartitionKey bool) *schemapb.CollectionSchema { + fieldName2Type := make(map[string]schemapb.DataType) + fieldName2Type["fvec_field"] = schemapb.DataType_FloatVector + fieldName2Type["varChar_field"] = schemapb.DataType_VarChar + fieldName2Type["int64_field"] = schemapb.DataType_Int64 + schema := constructCollectionSchemaByDataType(colName, fieldName2Type, "int64_field", false) + if hasPartitionKey { + partitionKeyField := &schemapb.FieldSchema{ + Name: "partition_key_field", + DataType: schemapb.DataType_Int64, + IsPartitionKey: true, + } + fieldName2Type["partition_key_field"] = schemapb.DataType_Int64 + schema.Fields = append(schema.Fields, partitionKeyField) + } + return schema + } + + getCollectionTask := func(colName string, isIso bool, marshaledSchema []byte) *createCollectionTask { + isoStr := "false" + if isIso { + isoStr = "true" + } + + return &createCollectionTask{ + Condition: NewTaskCondition(ctx), + CreateCollectionRequest: &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), + Timestamp: Timestamp(time.Now().UnixNano()), + }, + DbName: "", + CollectionName: colName, + Schema: marshaledSchema, + ShardsNum: shardsNum, + Properties: []*commonpb.KeyValuePair{{Key: common.PartitionKeyIsolationKey, Value: isoStr}}, + }, + ctx: ctx, + rootCoord: rc, + result: nil, + schema: nil, + } + } + + createIsoCollection := func(colName string, hasPartitionKey bool, isIsolation bool, isIsoNil bool) { + isoStr := "false" + if isIsolation { + isoStr = "true" + } + schema := getSchema(colName, hasPartitionKey) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + createColReq := &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropCollection, + MsgID: 100, + Timestamp: 100, + }, + DbName: dbName, + CollectionName: colName, + Schema: marshaledSchema, + ShardsNum: 1, + Properties: []*commonpb.KeyValuePair{{Key: common.PartitionKeyIsolationKey, Value: isoStr}}, + } + if isIsoNil { + createColReq.Properties = nil + } + + stats, err := rc.CreateCollection(ctx, createColReq) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, stats.ErrorCode) + } + + getAlterCollectionTask := func(colName string, isIsolation bool) *alterCollectionTask { + isoStr := "false" + if isIsolation { + isoStr = "true" + } + + return &alterCollectionTask{ + AlterCollectionRequest: &milvuspb.AlterCollectionRequest{ + Base: &commonpb.MsgBase{}, + CollectionName: colName, + Properties: []*commonpb.KeyValuePair{{Key: common.PartitionKeyIsolationKey, Value: isoStr}}, + }, + queryCoord: qc, + dataCoord: dc, + } + } + + t.Run("create collection valid", func(t *testing.T) { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + schema := getSchema(collectionName, true) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := getCollectionTask(collectionName, true, marshaledSchema) + err = createCollectionTask.PreExecute(ctx) + assert.NoError(t, err) + err = createCollectionTask.Execute(ctx) + assert.NoError(t, err) + }) + + t.Run("create collection without isolation", func(t *testing.T) { + schema := getSchema(collectionName, true) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := getCollectionTask(collectionName, false, marshaledSchema) + err = createCollectionTask.PreExecute(ctx) + assert.NoError(t, err) + err = createCollectionTask.Execute(ctx) + assert.NoError(t, err) + }) + + t.Run("create collection isolation but no partition key", func(t *testing.T) { + schema := getSchema(collectionName, false) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := getCollectionTask(collectionName, true, marshaledSchema) + assert.ErrorContains(t, createCollectionTask.PreExecute(ctx), "partition key isolation mode is enabled but no partition key field is set") + }) + + t.Run("create collection with isolation and partition key but MV is not enabled", func(t *testing.T) { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + schema := getSchema(collectionName, true) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := getCollectionTask(collectionName, true, marshaledSchema) + assert.ErrorContains(t, createCollectionTask.PreExecute(ctx), "partition key isolation mode is enabled but current Milvus does not support it") + }) + + t.Run("alter collection from valid", func(t *testing.T) { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + colName := collectionName + "AlterValid" + createIsoCollection(colName, true, false, false) + alterTask := getAlterCollectionTask(colName, true) + err := alterTask.PreExecute(ctx) + assert.NoError(t, err) + }) + + t.Run("alter collection without isolation", func(t *testing.T) { + colName := collectionName + "AlterNoIso" + createIsoCollection(colName, true, false, true) + alterTask := alterCollectionTask{ + AlterCollectionRequest: &milvuspb.AlterCollectionRequest{ + Base: &commonpb.MsgBase{}, + CollectionName: colName, + Properties: nil, + }, + queryCoord: qc, + } + err := alterTask.PreExecute(ctx) + assert.NoError(t, err) + }) + + t.Run("alter collection isolation but no partition key", func(t *testing.T) { + colName := collectionName + "AlterNoPartkey" + createIsoCollection(colName, false, false, false) + alterTask := getAlterCollectionTask(colName, true) + assert.ErrorContains(t, alterTask.PreExecute(ctx), "partition key isolation mode is enabled but no partition key field is set") + }) + + t.Run("alter collection with isolation and partition key but MV is not enabled", func(t *testing.T) { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + colName := collectionName + "AlterNoMv" + createIsoCollection(colName, true, false, false) + alterTask := getAlterCollectionTask(colName, true) + assert.ErrorContains(t, alterTask.PreExecute(ctx), "partition key isolation mode is enabled but current Milvus does not support it") + }) + + t.Run("alter collection with vec index and isolation", func(t *testing.T) { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + colName := collectionName + "AlterVecIndex" + createIsoCollection(colName, true, true, false) + resp, err := rc.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{DbName: dbName, CollectionName: colName}) + assert.NoError(t, err) + var vecFieldID int64 = 0 + for _, field := range resp.Schema.Fields { + if field.DataType == schemapb.DataType_FloatVector { + vecFieldID = field.FieldID + break + } + } + assert.NotEqual(t, vecFieldID, int64(0)) + dc.DescribeIndexFunc = func(ctx context.Context, request *indexpb.DescribeIndexRequest, opts ...grpc.CallOption) (*indexpb.DescribeIndexResponse, error) { + return &indexpb.DescribeIndexResponse{ + Status: merr.Success(), + IndexInfos: []*indexpb.IndexInfo{ + { + FieldID: vecFieldID, + }, + }, + }, nil + } + alterTask := getAlterCollectionTask(colName, false) + assert.ErrorContains(t, alterTask.PreExecute(ctx), + "can not alter partition key isolation mode if the collection already has a vector index. Please drop the index first") + }) +} diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index 22f2e9b58f..779e631bcf 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/log" ) @@ -74,6 +75,16 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { core: a.core, }) + // properties needs to be refreshed in the cache + aliases := a.core.meta.ListAliasesByID(oldColl.CollectionID) + redoTask.AddSyncStep(&expireCacheStep{ + baseStep: baseStep{core: a.core}, + dbName: a.Req.GetDbName(), + collectionNames: append(aliases, a.Req.GetCollectionName()), + collectionID: oldColl.CollectionID, + opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollection)}, + }) + return redoTask.Execute(ctx) } diff --git a/internal/rootcoord/alter_collection_task_test.go b/internal/rootcoord/alter_collection_task_test.go index 583c7e0ce0..3252534986 100644 --- a/internal/rootcoord/alter_collection_task_test.go +++ b/internal/rootcoord/alter_collection_task_test.go @@ -92,8 +92,9 @@ func Test_alterCollectionTask_Execute(t *testing.T) { mock.Anything, mock.Anything, ).Return(errors.New("err")) + meta.On("ListAliasesByID", mock.Anything).Return([]string{}) - core := newTestCore(withMeta(meta)) + core := newTestCore(withValidProxyManager(), withMeta(meta)) task := &alterCollectionTask{ baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterCollectionRequest{ @@ -121,13 +122,49 @@ func Test_alterCollectionTask_Execute(t *testing.T) { mock.Anything, mock.Anything, ).Return(nil) + meta.On("ListAliasesByID", mock.Anything).Return([]string{}) broker := newMockBroker() broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { return errors.New("err") } - core := newTestCore(withMeta(meta), withBroker(broker)) + core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker)) + task := &alterCollectionTask{ + baseTask: newBaseTask(context.Background(), core), + Req: &milvuspb.AlterCollectionRequest{ + Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, + CollectionName: "cn", + Properties: properties, + }, + } + + err := task.Execute(context.Background()) + assert.Error(t, err) + }) + + t.Run("expire cache failed", func(t *testing.T) { + meta := mockrootcoord.NewIMetaTable(t) + meta.On("GetCollectionByName", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(&model.Collection{CollectionID: int64(1)}, nil) + meta.On("AlterCollection", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(nil) + meta.On("ListAliasesByID", mock.Anything).Return([]string{}) + + broker := newMockBroker() + broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { + return errors.New("err") + } + + core := newTestCore(withInvalidProxyManager(), withMeta(meta), withBroker(broker)) task := &alterCollectionTask{ baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterCollectionRequest{ @@ -155,13 +192,14 @@ func Test_alterCollectionTask_Execute(t *testing.T) { mock.Anything, mock.Anything, ).Return(nil) + meta.On("ListAliasesByID", mock.Anything).Return([]string{}) broker := newMockBroker() broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { return nil } - core := newTestCore(withMeta(meta), withBroker(broker)) + core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker)) task := &alterCollectionTask{ baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterCollectionRequest{ @@ -220,5 +258,17 @@ func Test_alterCollectionTask_Execute(t *testing.T) { Key: common.CollectionAutoCompactionKey, Value: "true", }) + + updatePropsIso := []*commonpb.KeyValuePair{ + { + Key: common.PartitionKeyIsolationKey, + Value: "true", + }, + } + updateCollectionProperties(coll, updatePropsIso) + assert.Contains(t, coll.Properties, &commonpb.KeyValuePair{ + Key: common.PartitionKeyIsolationKey, + Value: "true", + }) }) } diff --git a/internal/util/exprutil/expr_checker.go b/internal/util/exprutil/expr_checker.go index 00866a9aa9..eddb4c740c 100644 --- a/internal/util/exprutil/expr_checker.go +++ b/internal/util/exprutil/expr_checker.go @@ -509,3 +509,95 @@ func maxGenericValue(left *planpb.GenericValue, right *planpb.GenericValue) *pla } return right } + +func ValidatePartitionKeyIsolation(expr *planpb.Expr) error { + foundPartitionKey, err := validatePartitionKeyIsolationFromExpr(expr) + if err != nil { + return err + } + if !foundPartitionKey { + return errors.New("partition key not found in expr when validating partition key isolation") + } + return nil +} + +func validatePartitionKeyIsolationFromExpr(expr *planpb.Expr) (bool, error) { + switch expr := expr.GetExpr().(type) { + case *planpb.Expr_BinaryExpr: + return validatePartitionKeyIsolationFromBinaryExpr(expr.BinaryExpr) + case *planpb.Expr_UnaryExpr: + return validatePartitionKeyIsolationFromUnaryExpr(expr.UnaryExpr) + case *planpb.Expr_TermExpr: + return validatePartitionKeyIsolationFromTermExpr(expr.TermExpr) + case *planpb.Expr_UnaryRangeExpr: + return validatePartitionKeyIsolationFromRangeExpr(expr.UnaryRangeExpr) + } + return false, nil +} + +func validatePartitionKeyIsolationFromBinaryExpr(expr *planpb.BinaryExpr) (bool, error) { + // return directly if has errors on either or both sides + leftRes, leftErr := validatePartitionKeyIsolationFromExpr(expr.Left) + if leftErr != nil { + return leftRes, leftErr + } + rightRes, rightErr := validatePartitionKeyIsolationFromExpr(expr.Right) + if rightErr != nil { + return rightRes, rightErr + } + + // the following deals with no error on either side + if expr.Op == planpb.BinaryExpr_LogicalAnd { + // if one of them is partition key + // e.g. partition_key_field == 1 && other_field > 10 + if leftRes || rightRes { + return true, nil + } + // if none of them is partition key + return false, nil + } + + if expr.Op == planpb.BinaryExpr_LogicalOr { + // if either side has partition key, but OR them + // e.g. partition_key_field == 1 || other_field > 10 + if leftRes || rightRes { + return true, errors.New("partition key isolation does not support OR") + } + // if none of them has partition key + return false, nil + } + return false, nil +} + +func validatePartitionKeyIsolationFromUnaryExpr(expr *planpb.UnaryExpr) (bool, error) { + res, err := validatePartitionKeyIsolationFromExpr(expr.GetChild()) + if err != nil { + return res, err + } + if expr.Op == planpb.UnaryExpr_Not { + if res { + return true, errors.New("partition key isolation does not support NOT") + } + return false, nil + } + return res, err +} + +func validatePartitionKeyIsolationFromTermExpr(expr *planpb.TermExpr) (bool, error) { + if expr.GetColumnInfo().GetIsPartitionKey() { + // e.g. partition_key_field in [1, 2, 3] + return true, errors.New("partition key isolation does not support IN") + } + return false, nil +} + +func validatePartitionKeyIsolationFromRangeExpr(expr *planpb.UnaryRangeExpr) (bool, error) { + if expr.GetColumnInfo().GetIsPartitionKey() { + if expr.GetOp() == planpb.OpType_Equal { + // e.g. partition_key_field == 1 + return true, nil + } + return true, errors.Newf("partition key isolation does not support %s", expr.GetOp().String()) + } + return false, nil +} diff --git a/internal/util/exprutil/expr_checker_test.go b/internal/util/exprutil/expr_checker_test.go index dc519331eb..f417de14a8 100644 --- a/internal/util/exprutil/expr_checker_test.go +++ b/internal/util/exprutil/expr_checker_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/parser/planparserv2" "github.com/milvus-io/milvus/internal/proto/planpb" @@ -277,3 +278,209 @@ func TestParseStrRanges(t *testing.T) { assert.Equal(t, range0.includeUpper, false) } } + +func TestValidatePartitionKeyIsolation(t *testing.T) { + prefix := "TestValidatePartitionKeyIsolation" + collectionName := prefix + funcutil.GenRandomStr() + + fieldName2Type := make(map[string]schemapb.DataType) + fieldName2Type["int64_field"] = schemapb.DataType_Int64 + fieldName2Type["varChar_field"] = schemapb.DataType_VarChar + fieldName2Type["fvec_field"] = schemapb.DataType_FloatVector + schema := testutil.ConstructCollectionSchemaByDataType(collectionName, fieldName2Type, + "int64_field", false, 8) + schema.Properties = append(schema.Properties, &commonpb.KeyValuePair{ + Key: common.PartitionKeyIsolationKey, + Value: "true", + }) + partitionKeyField := &schemapb.FieldSchema{ + Name: "key_field", + DataType: schemapb.DataType_Int64, + IsPartitionKey: true, + } + schema.Fields = append(schema.Fields, partitionKeyField) + fieldID := common.StartOfUserFieldID + for _, field := range schema.Fields { + field.FieldID = int64(fieldID) + fieldID++ + } + schemaHelper, err := typeutil.CreateSchemaHelper(schema) + require.NoError(t, err) + + type testCase struct { + name string + expr string + expectedErrorString string + } + cases := []testCase{ + { + name: "partition key isolation equal", + expr: "key_field == 10", + expectedErrorString: "", + }, + { + name: "partition key isolation equal AND with same field equal", + expr: "key_field == 10 && key_field == 10", + expectedErrorString: "", + }, + { + name: "partition key isolation equal AND with same field equal diff", + expr: "key_field == 10 && key_field == 20", + expectedErrorString: "", + }, + { + name: "partition key isolation equal AND with same field equal 3", + expr: "key_field == 10 && key_field == 11 && key_field == 12", + expectedErrorString: "", + }, + { + name: "partition key isolation equal AND with varchar field equal", + expr: "key_field == 10 && varChar_field == 'a'", + expectedErrorString: "", + }, + { + name: "partition key isolation equal AND with varchar field not equal", + expr: "key_field == 10 && varChar_field != 'a'", + expectedErrorString: "", + }, + { + name: "partition key isolation equal AND with varchar field in", + expr: "key_field == 10 && varChar_field in ['a', 'b']", + expectedErrorString: "", + }, + { + name: "partition key isolation equal AND with varchar field in Reversed", + expr: "varChar_field in ['a', 'b'] && key_field == 10", + expectedErrorString: "", + }, + { + name: "partition key isolation equal AND with varchar field OR", + expr: "key_field == 10 && (varChar_field == 'a' || varChar_field == 'b')", + expectedErrorString: "", + }, + { + name: "partition key isolation equal AND with varchar field OR Reversed", + expr: "(varChar_field == 'a' || varChar_field == 'b') && key_field == 10", + expectedErrorString: "", + }, + { + name: "partition key isolation equal to arithmic operations", + expr: "key_field == (1+1)", + expectedErrorString: "", + }, + { + name: "partition key isolation empty", + expr: "", + expectedErrorString: "partition key not found in expr when validating partition key isolation", + }, + { + name: "partition key isolation not equal", + expr: "key_field != 10", + expectedErrorString: "partition key isolation does not support NotEqual", + }, + { + name: "partition key isolation term", + expr: "key_field in [10]", + expectedErrorString: "partition key isolation does not support IN", + }, + { + name: "partition key isolation term multiple", + expr: "key_field in [10, 20]", + expectedErrorString: "partition key isolation does not support IN", + }, + { + name: "partition key isolation NOT term", + expr: "key_field not in [10]", + expectedErrorString: "partition key isolation does not support IN", + }, + { + name: "partition key isolation less", + expr: "key_field < 10", + expectedErrorString: "partition key isolation does not support LessThan", + }, + { + name: "partition key isolation less or equal", + expr: "key_field <= 10", + expectedErrorString: "partition key isolation does not support LessEq", + }, + { + name: "partition key isolation greater", + expr: "key_field > 10", + expectedErrorString: "partition key isolation does not support GreaterThan", + }, + { + name: "partition key isolation equal greator or equal", + expr: "key_field >= 10", + expectedErrorString: "partition key isolation does not support GreaterEqual", + }, + { + name: "partition key isolation NOT equal", + expr: "not(key_field == 10)", + expectedErrorString: "partition key isolation does not support NOT", + }, + { + name: "partition key isolation equal AND with same field term", + expr: "key_field == 10 && key_field in [10]", + expectedErrorString: "partition key isolation does not support IN", + }, + { + name: "partition key isolation equal OR with same field equal", + expr: "key_field == 10 || key_field == 11", + expectedErrorString: "partition key isolation does not support OR", + }, + { + name: "partition key isolation equal OR with same field equal Reversed", + expr: "key_field == 11 || key_field == 10", + expectedErrorString: "partition key isolation does not support OR", + }, + { + name: "partition key isolation equal OR with other field equal", + expr: "key_field == 10 || varChar_field == 'a'", + expectedErrorString: "partition key isolation does not support OR", + }, + { + name: "partition key isolation equal OR with other field equal Reversed", + expr: "varChar_field == 'a' || key_field == 10", + expectedErrorString: "partition key isolation does not support OR", + }, + { + name: "partition key isolation equal OR with other field equal", + expr: "key_field == 10 || varChar_field == 'a'", + expectedErrorString: "partition key isolation does not support OR", + }, + { + name: "partition key isolation equal AND", + expr: "key_field == 10 && (key_field == 10 || key_field == 11)", + expectedErrorString: "partition key isolation does not support OR", + }, + { + name: "partition key isolation other field equal", + expr: "varChar_field == 'a'", + expectedErrorString: "partition key not found in expr when validating partition key isolation", + }, + { + name: "partition key isolation other field equal AND", + expr: "varChar_field == 'a' && int64_field == 1", + expectedErrorString: "partition key not found in expr when validating partition key isolation", + }, + { + name: "partition key isolation complex OR", + expr: "(key_field == 10 and int64_field == 11) or (key_field == 10 and varChar_field == 'a')", + expectedErrorString: "partition key isolation does not support OR", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + queryPlan, err := planparserv2.CreateRetrievePlan(schemaHelper, tc.expr) + assert.NoError(t, err) + planExpr, err := ParseExprFromPlan(queryPlan) + assert.NoError(t, err) + if tc.expectedErrorString != "" { + assert.ErrorContains(t, ValidatePartitionKeyIsolation(planExpr), tc.expectedErrorString) + } else { + assert.NoError(t, ValidatePartitionKeyIsolation(planExpr)) + } + }) + } +} diff --git a/pkg/common/common.go b/pkg/common/common.go index bd5b600b3a..006c77f1e1 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -22,6 +22,7 @@ import ( "strconv" "strings" + "github.com/cockroachdb/errors" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" ) @@ -161,8 +162,9 @@ const ( // common properties const ( - MmapEnabledKey = "mmap.enabled" - LazyLoadEnableKey = "lazyload.enabled" + MmapEnabledKey = "mmap.enabled" + LazyLoadEnableKey = "lazyload.enabled" + PartitionKeyIsolationKey = "partitionkey.isolation" ) const ( @@ -224,6 +226,31 @@ func IsCollectionLazyLoadEnabled(kvs ...*commonpb.KeyValuePair) bool { return false } +func IsPartitionKeyIsolationKvEnabled(kvs ...*commonpb.KeyValuePair) (bool, error) { + for _, kv := range kvs { + if kv.Key == PartitionKeyIsolationKey { + val, err := strconv.ParseBool(strings.ToLower(kv.Value)) + if err != nil { + return false, errors.Wrap(err, "failed to parse partition key isolation") + } + return val, nil + } + } + return false, nil +} + +func IsPartitionKeyIsolationPropEnabled(props map[string]string) (bool, error) { + val, ok := props[PartitionKeyIsolationKey] + if !ok { + return false, nil + } + iso, parseErr := strconv.ParseBool(val) + if parseErr != nil { + return false, errors.Wrap(parseErr, "failed to parse partition key isolation property") + } + return iso, nil +} + const ( // LatestVerision is the magic number for watch latest revision LatestRevision = int64(-1) diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index 2dc31e33fb..11ca8949f1 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -88,3 +88,64 @@ func TestDatabaseProperties(t *testing.T) { _, err = DatabaseLevelResourceGroups(props) assert.Error(t, err) } + +func TestCommonPartitionKeyIsolation(t *testing.T) { + getProto := func(val string) []*commonpb.KeyValuePair { + return []*commonpb.KeyValuePair{ + { + Key: PartitionKeyIsolationKey, + Value: val, + }, + } + } + + getMp := func(val string) map[string]string { + return map[string]string{ + PartitionKeyIsolationKey: val, + } + } + + t.Run("pb", func(t *testing.T) { + props := getProto("true") + res, err := IsPartitionKeyIsolationKvEnabled(props...) + assert.NoError(t, err) + assert.True(t, res) + + props = getProto("false") + res, err = IsPartitionKeyIsolationKvEnabled(props...) + assert.NoError(t, err) + assert.False(t, res) + + props = getProto("") + res, err = IsPartitionKeyIsolationKvEnabled(props...) + assert.ErrorContains(t, err, "failed to parse partition key isolation") + assert.False(t, res) + + props = getProto("invalid") + res, err = IsPartitionKeyIsolationKvEnabled(props...) + assert.ErrorContains(t, err, "failed to parse partition key isolation") + assert.False(t, res) + }) + + t.Run("map", func(t *testing.T) { + props := getMp("true") + res, err := IsPartitionKeyIsolationPropEnabled(props) + assert.NoError(t, err) + assert.True(t, res) + + props = getMp("false") + res, err = IsPartitionKeyIsolationPropEnabled(props) + assert.NoError(t, err) + assert.False(t, res) + + props = getMp("") + res, err = IsPartitionKeyIsolationPropEnabled(props) + assert.ErrorContains(t, err, "failed to parse partition key isolation property") + assert.False(t, res) + + props = getMp("invalid") + res, err = IsPartitionKeyIsolationPropEnabled(props) + assert.ErrorContains(t, err, "failed to parse partition key isolation property") + assert.False(t, res) + }) +}