diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 593e19bbca..3e8408a25f 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -218,8 +218,9 @@ func (s *schemaInfo) GetLoadFieldIDs(loadFields []string, skipDynamicField bool) func (s *schemaInfo) validateLoadFields(names []string, fields []*schemapb.FieldSchema) error { // ignore error if not found partitionKeyField, _ := s.schemaHelper.GetPartitionKeyField() + clusteringKeyField, _ := s.schemaHelper.GetClusteringKeyField() - var hasPrimaryKey, hasPartitionKey, hasVector bool + var hasPrimaryKey, hasPartitionKey, hasClusteringKey, hasVector bool for _, field := range fields { if field.GetFieldID() == s.pkField.GetFieldID() { hasPrimaryKey = true @@ -230,6 +231,9 @@ func (s *schemaInfo) validateLoadFields(names []string, fields []*schemapb.Field if field.IsPartitionKey { hasPartitionKey = true } + if field.IsClusteringKey { + hasClusteringKey = true + } } if !hasPrimaryKey { @@ -241,6 +245,9 @@ func (s *schemaInfo) validateLoadFields(names []string, fields []*schemapb.Field if partitionKeyField != nil && !hasPartitionKey { return merr.WrapErrParameterInvalidMsg("load field list %v does not contain partition key field %s", names, partitionKeyField.GetName()) } + if clusteringKeyField != nil && !hasClusteringKey { + return merr.WrapErrParameterInvalidMsg("load field list %v does not contain clsutering key field %s", names, clusteringKeyField.GetName()) + } return nil } diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index 9ea940e920..8e4267b234 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -1196,6 +1196,12 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) { DataType: schemapb.DataType_JSON, IsDynamic: true, } + clusteringKeyField := &schemapb.FieldSchema{ + FieldID: common.StartOfUserFieldID + 5, + Name: "clustering_key", + DataType: schemapb.DataType_Int32, + IsClusteringKey: true, + } testCases := []testCase{ { @@ -1229,11 +1235,12 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) { partitionKeyField, vectorField, dynamicField, + clusteringKeyField, }, }, loadFields: nil, skipDynamicField: false, - expectResult: []int64{common.StartOfUserFieldID, common.StartOfUserFieldID + 2, common.StartOfUserFieldID + 3, common.StartOfUserFieldID + 4}, + expectResult: []int64{common.StartOfUserFieldID, common.StartOfUserFieldID + 2, common.StartOfUserFieldID + 3, common.StartOfUserFieldID + 4, common.StartOfUserFieldID + 5}, expectErr: false, }, { @@ -1248,11 +1255,12 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) { partitionKeyField, vectorField, dynamicField, + clusteringKeyField, }, }, - loadFields: []string{"pk", "part_key", "vector"}, + loadFields: []string{"pk", "part_key", "vector", "clustering_key"}, skipDynamicField: false, - expectResult: []int64{common.StartOfUserFieldID, common.StartOfUserFieldID + 2, common.StartOfUserFieldID + 3, common.StartOfUserFieldID + 4}, + expectResult: []int64{common.StartOfUserFieldID, common.StartOfUserFieldID + 2, common.StartOfUserFieldID + 3, common.StartOfUserFieldID + 4, common.StartOfUserFieldID + 5}, expectErr: false, }, { @@ -1328,6 +1336,23 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) { skipDynamicField: true, expectErr: true, }, + { + tag: "clustering_key_not_loaded", + schema: &schemapb.CollectionSchema{ + EnableDynamicField: true, + Fields: []*schemapb.FieldSchema{ + rowIDField, + timestampField, + pkField, + scalarField, + partitionKeyField, + vectorField, + clusteringKeyField, + }, + }, + loadFields: []string{"pk", "part_key", "vector"}, + expectErr: true, + }, } for _, tc := range testCases { diff --git a/internal/proxy/util.go b/internal/proxy/util.go index d369247331..fe90239b25 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -634,9 +634,6 @@ func validateMultipleVectorFields(schema *schemapb.CollectionSchema) error { } func validateLoadFieldsList(schema *schemapb.CollectionSchema) error { - // ignore error if not found - // partitionKeyField, _ := s.schemaHelper.GetPartitionKeyField() - var vectorCnt int for _, field := range schema.Fields { shouldLoad, err := common.ShouldFieldBeLoaded(field.GetTypeParams()) @@ -658,6 +655,10 @@ func validateLoadFieldsList(schema *schemapb.CollectionSchema) error { if field.IsPartitionKey { return merr.WrapErrParameterInvalidMsg("Partition Key field %s cannot skip loading", field.GetName()) } + + if field.IsClusteringKey { + return merr.WrapErrParameterInvalidMsg("Clustering Key field %s cannot skip loading", field.GetName()) + } } if vectorCnt == 0 { diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index fafb8775bb..bff4af35c4 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -2521,6 +2521,12 @@ func TestValidateLoadFieldsList(t *testing.T) { DataType: schemapb.DataType_JSON, IsDynamic: true, } + clusteringKeyField := &schemapb.FieldSchema{ + FieldID: common.StartOfUserFieldID + 5, + Name: common.MetaFieldName, + DataType: schemapb.DataType_Int32, + IsClusteringKey: true, + } addSkipLoadAttr := func(f *schemapb.FieldSchema, flag bool) *schemapb.FieldSchema { result := typeutil.Clone(f) @@ -2544,6 +2550,7 @@ func TestValidateLoadFieldsList(t *testing.T) { partitionKeyField, vectorField, dynamicField, + clusteringKeyField, }, }, expectErr: false, @@ -2596,6 +2603,23 @@ func TestValidateLoadFieldsList(t *testing.T) { }, expectErr: true, }, + { + tag: "clustering_key_not_loaded", + schema: &schemapb.CollectionSchema{ + EnableDynamicField: true, + Fields: []*schemapb.FieldSchema{ + rowIDField, + timestampField, + pkField, + scalarField, + partitionKeyField, + vectorField, + dynamicField, + addSkipLoadAttr(clusteringKeyField, true), + }, + }, + expectErr: true, + }, } for _, tc := range testCases { diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 1518b901a1..597e08c4a9 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -251,20 +251,30 @@ func EstimateEntitySize(fieldsData []*schemapb.FieldData, rowOffset int) (int, e // SchemaHelper provides methods to get the schema of fields type SchemaHelper struct { - schema *schemapb.CollectionSchema - nameOffset map[string]int - idOffset map[int64]int - primaryKeyOffset int - partitionKeyOffset int - dynamicFieldOffset int - loadFields Set[int64] + schema *schemapb.CollectionSchema + nameOffset map[string]int + idOffset map[int64]int + primaryKeyOffset int + partitionKeyOffset int + clusteringKeyOffset int + dynamicFieldOffset int + loadFields Set[int64] } func CreateSchemaHelperWithLoadFields(schema *schemapb.CollectionSchema, loadFields []int64) (*SchemaHelper, error) { if schema == nil { return nil, errors.New("schema is nil") } - schemaHelper := SchemaHelper{schema: schema, nameOffset: make(map[string]int), idOffset: make(map[int64]int), primaryKeyOffset: -1, partitionKeyOffset: -1, dynamicFieldOffset: -1, loadFields: NewSet(loadFields...)} + schemaHelper := SchemaHelper{ + schema: schema, + nameOffset: make(map[string]int), + idOffset: make(map[int64]int), + primaryKeyOffset: -1, + partitionKeyOffset: -1, + clusteringKeyOffset: -1, + dynamicFieldOffset: -1, + loadFields: NewSet(loadFields...), + } for offset, field := range schema.Fields { if _, ok := schemaHelper.nameOffset[field.Name]; ok { return nil, fmt.Errorf("duplicated fieldName: %s", field.Name) @@ -288,6 +298,13 @@ func CreateSchemaHelperWithLoadFields(schema *schemapb.CollectionSchema, loadFie schemaHelper.partitionKeyOffset = offset } + if field.IsClusteringKey { + if schemaHelper.clusteringKeyOffset != -1 { + return nil, errors.New("clustering key is not unique") + } + schemaHelper.clusteringKeyOffset = offset + } + if field.IsDynamic { if schemaHelper.dynamicFieldOffset != -1 { return nil, errors.New("dynamic field is not unique") @@ -319,6 +336,15 @@ func (helper *SchemaHelper) GetPartitionKeyField() (*schemapb.FieldSchema, error return helper.schema.Fields[helper.partitionKeyOffset], nil } +// GetClusteringKeyField returns the schema of the clustering key. +// If not found, an error shall be returned. +func (helper *SchemaHelper) GetClusteringKeyField() (*schemapb.FieldSchema, error) { + if helper.clusteringKeyOffset == -1 { + return nil, fmt.Errorf("failed to get clustering key field: not clustering key in schema") + } + return helper.schema.Fields[helper.clusteringKeyOffset], nil +} + // GetDynamicField returns the field schema of dynamic field if exists. // if there is no dynamic field defined in schema, error will be returned. func (helper *SchemaHelper) GetDynamicField() (*schemapb.FieldSchema, error) { diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index 789f9a955d..ff19f3313a 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -474,6 +474,124 @@ func TestSchemaHelper_GetDynamicField(t *testing.T) { }) } +func TestSchemaHelper_GetClusteringKeyField(t *testing.T) { + t.Run("with_clustering_key", func(t *testing.T) { + sch := &schemapb.CollectionSchema{ + Name: "testColl", + Description: "", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "field_int64", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 101, + Name: "field_float_vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + { + FieldID: 102, + Name: "group", + DataType: schemapb.DataType_Int64, + IsClusteringKey: true, + }, + }, + } + + helper, err := CreateSchemaHelper(sch) + require.NoError(t, err) + + f, err := helper.GetClusteringKeyField() + assert.NoError(t, err) + assert.NotNil(t, f) + assert.EqualValues(t, 102, f.FieldID) + }) + + t.Run("without_clusteriny_key_schema", func(t *testing.T) { + sch := &schemapb.CollectionSchema{ + Name: "testColl", + Description: "", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "field_int64", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 101, + Name: "field_float_vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + } + + helper, err := CreateSchemaHelper(sch) + require.NoError(t, err) + + _, err = helper.GetClusteringKeyField() + assert.Error(t, err) + }) + + t.Run("multiple_dynamic_fields", func(t *testing.T) { + sch := &schemapb.CollectionSchema{ + Name: "testColl", + Description: "", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "field_int64", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 101, + Name: "field_float_vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + { + FieldID: 102, + Name: "group", + DataType: schemapb.DataType_Int64, + IsClusteringKey: true, + }, + { + FieldID: 103, + Name: "batch", + DataType: schemapb.DataType_VarChar, + IsClusteringKey: true, + }, + }, + } + + _, err := CreateSchemaHelper(sch) + assert.Error(t, err) + }) +} + func TestSchema_invalid(t *testing.T) { t.Run("Duplicate field name", func(t *testing.T) { schema := &schemapb.CollectionSchema{