mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
enhance: Remove unused load field check from proxy (#42816)
Related to #42489 Since load list works as hint after cachelayer implemented, the related check logic could be removed to keep code logic clean. --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
fadc053d7a
commit
74ea57bac1
@ -13,7 +13,6 @@ import (
|
|||||||
planparserv2 "github.com/milvus-io/milvus/internal/parser/planparserv2/generated"
|
planparserv2 "github.com/milvus-io/milvus/internal/parser/planparserv2/generated"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
|
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -170,9 +169,6 @@ func CreateSearchPlan(schema *typeutil.SchemaHelper, exprStr string, vectorField
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// plan ok with schema, check ann field
|
// plan ok with schema, check ann field
|
||||||
if !schema.IsFieldLoaded(vectorField.GetFieldID()) {
|
|
||||||
return nil, merr.WrapErrParameterInvalidMsg("ann field \"%s\" not loaded", vectorFieldName)
|
|
||||||
}
|
|
||||||
fieldID := vectorField.FieldID
|
fieldID := vectorField.FieldID
|
||||||
dataType := vectorField.DataType
|
dataType := vectorField.DataType
|
||||||
|
|
||||||
|
|||||||
@ -124,7 +124,7 @@ type schemaInfo struct {
|
|||||||
schemaHelper *typeutil.SchemaHelper
|
schemaHelper *typeutil.SchemaHelper
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSchemaInfoWithLoadFields(schema *schemapb.CollectionSchema, loadFields []int64) *schemaInfo {
|
func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
|
||||||
fieldMap := typeutil.NewConcurrentMap[string, int64]()
|
fieldMap := typeutil.NewConcurrentMap[string, int64]()
|
||||||
hasPartitionkey := false
|
hasPartitionkey := false
|
||||||
var pkField *schemapb.FieldSchema
|
var pkField *schemapb.FieldSchema
|
||||||
@ -139,7 +139,7 @@ func newSchemaInfoWithLoadFields(schema *schemapb.CollectionSchema, loadFields [
|
|||||||
}
|
}
|
||||||
// skip load fields logic for now
|
// skip load fields logic for now
|
||||||
// partial load shall be processed as hint after tiered storage feature
|
// partial load shall be processed as hint after tiered storage feature
|
||||||
schemaHelper, _ := typeutil.CreateSchemaHelperWithLoadFields(schema, nil)
|
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
|
||||||
return &schemaInfo{
|
return &schemaInfo{
|
||||||
CollectionSchema: schema,
|
CollectionSchema: schema,
|
||||||
fieldMap: fieldMap,
|
fieldMap: fieldMap,
|
||||||
@ -149,10 +149,6 @@ func newSchemaInfoWithLoadFields(schema *schemapb.CollectionSchema, loadFields [
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
|
|
||||||
return newSchemaInfoWithLoadFields(schema, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *schemaInfo) MapFieldID(name string) (int64, bool) {
|
func (s *schemaInfo) MapFieldID(name string) (int64, bool) {
|
||||||
return s.fieldMap.Get(name)
|
return s.fieldMap.Get(name)
|
||||||
}
|
}
|
||||||
@ -248,10 +244,6 @@ func (s *schemaInfo) validateLoadFields(names []string, fields []*schemapb.Field
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *schemaInfo) IsFieldLoaded(fieldID int64) bool {
|
|
||||||
return s.schemaHelper.IsFieldLoaded(fieldID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *schemaInfo) CanRetrieveRawFieldData(field *schemapb.FieldSchema) bool {
|
func (s *schemaInfo) CanRetrieveRawFieldData(field *schemapb.FieldSchema) bool {
|
||||||
return s.schemaHelper.CanRetrieveRawFieldData(field)
|
return s.schemaHelper.CanRetrieveRawFieldData(field)
|
||||||
}
|
}
|
||||||
@ -433,11 +425,6 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
loadFields, err := m.getCollectionLoadFields(ctx, collection.CollectionID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// check partitionID, createdTimestamp and utcstamp has sam element numbers
|
// check partitionID, createdTimestamp and utcstamp has sam element numbers
|
||||||
if len(partitions.PartitionNames) != len(partitions.CreatedTimestamps) || len(partitions.PartitionNames) != len(partitions.CreatedUtcTimestamps) {
|
if len(partitions.PartitionNames) != len(partitions.CreatedTimestamps) || len(partitions.PartitionNames) != len(partitions.CreatedUtcTimestamps) {
|
||||||
return nil, merr.WrapErrParameterInvalidMsg("partition names and timestamps number is not aligned, response: %s", partitions.String())
|
return nil, merr.WrapErrParameterInvalidMsg("partition names and timestamps number is not aligned, response: %s", partitions.String())
|
||||||
@ -465,7 +452,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
schemaInfo := newSchemaInfoWithLoadFields(collection.Schema, loadFields)
|
schemaInfo := newSchemaInfo(collection.Schema)
|
||||||
|
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
@ -787,28 +774,6 @@ func (m *MetaCache) showPartitions(ctx context.Context, dbName string, collectio
|
|||||||
return partitions, nil
|
return partitions, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MetaCache) getCollectionLoadFields(ctx context.Context, collectionID UniqueID) ([]int64, error) {
|
|
||||||
req := &querypb.ShowCollectionsRequest{
|
|
||||||
Base: commonpbutil.NewMsgBase(
|
|
||||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
|
||||||
),
|
|
||||||
CollectionIDs: []int64{collectionID},
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := m.mixCoord.ShowLoadCollections(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, merr.ErrCollectionNotLoaded) {
|
|
||||||
return []int64{}, nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// backward compatility, ignore HPL logic
|
|
||||||
if len(resp.GetLoadFields()) < 1 {
|
|
||||||
return []int64{}, nil
|
|
||||||
}
|
|
||||||
return resp.GetLoadFields()[0].GetData(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MetaCache) describeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error) {
|
func (m *MetaCache) describeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error) {
|
||||||
req := &rootcoordpb.DescribeDatabaseRequest{
|
req := &rootcoordpb.DescribeDatabaseRequest{
|
||||||
DbName: dbName,
|
DbName: dbName,
|
||||||
|
|||||||
@ -1083,7 +1083,6 @@ func TestMetaCacheGetCollectionWithUpdate(t *testing.T) {
|
|||||||
CreatedTimestamps: []uint64{11},
|
CreatedTimestamps: []uint64{11},
|
||||||
CreatedUtcTimestamps: []uint64{11},
|
CreatedUtcTimestamps: []uint64{11},
|
||||||
}, nil).Once()
|
}, nil).Once()
|
||||||
rootCoord.EXPECT().ShowLoadCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Once()
|
|
||||||
c, err := globalMetaCache.GetCollectionInfo(ctx, "foo", "bar", 1)
|
c, err := globalMetaCache.GetCollectionInfo(ctx, "foo", "bar", 1)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, c.collID, int64(1))
|
assert.Equal(t, c.collID, int64(1))
|
||||||
@ -1118,7 +1117,6 @@ func TestMetaCacheGetCollectionWithUpdate(t *testing.T) {
|
|||||||
CreatedTimestamps: []uint64{11},
|
CreatedTimestamps: []uint64{11},
|
||||||
CreatedUtcTimestamps: []uint64{11},
|
CreatedUtcTimestamps: []uint64{11},
|
||||||
}, nil).Once()
|
}, nil).Once()
|
||||||
rootCoord.EXPECT().ShowLoadCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Once()
|
|
||||||
c, err := globalMetaCache.GetCollectionInfo(ctx, "foo", "hoo", 0)
|
c, err := globalMetaCache.GetCollectionInfo(ctx, "foo", "hoo", 0)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, c.collID, int64(1))
|
assert.Equal(t, c.collID, int64(1))
|
||||||
|
|||||||
@ -1488,7 +1488,6 @@ func computeRecall(results *schemapb.SearchResultData, gts *schemapb.SearchResul
|
|||||||
// return value.
|
// return value.
|
||||||
func translateOutputFields(outputFields []string, schema *schemaInfo, removePkField bool) ([]string, []string, []string, bool, error) {
|
func translateOutputFields(outputFields []string, schema *schemaInfo, removePkField bool) ([]string, []string, []string, bool, error) {
|
||||||
var primaryFieldName string
|
var primaryFieldName string
|
||||||
var dynamicField *schemapb.FieldSchema
|
|
||||||
allFieldNameMap := make(map[string]*schemapb.FieldSchema)
|
allFieldNameMap := make(map[string]*schemapb.FieldSchema)
|
||||||
resultFieldNameMap := make(map[string]bool)
|
resultFieldNameMap := make(map[string]bool)
|
||||||
resultFieldNames := make([]string, 0)
|
resultFieldNames := make([]string, 0)
|
||||||
@ -1501,9 +1500,6 @@ func translateOutputFields(outputFields []string, schema *schemaInfo, removePkFi
|
|||||||
if field.IsPrimaryKey {
|
if field.IsPrimaryKey {
|
||||||
primaryFieldName = field.Name
|
primaryFieldName = field.Name
|
||||||
}
|
}
|
||||||
if field.IsDynamic {
|
|
||||||
dynamicField = field
|
|
||||||
}
|
|
||||||
allFieldNameMap[field.Name] = field
|
allFieldNameMap[field.Name] = field
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1517,8 +1513,7 @@ func translateOutputFields(outputFields []string, schema *schemaInfo, removePkFi
|
|||||||
if outputFieldName == "*" {
|
if outputFieldName == "*" {
|
||||||
userRequestedPkFieldExplicitly = true
|
userRequestedPkFieldExplicitly = true
|
||||||
for fieldName, field := range allFieldNameMap {
|
for fieldName, field := range allFieldNameMap {
|
||||||
// skip Cold field and fields that can't be output
|
if schema.CanRetrieveRawFieldData(field) {
|
||||||
if schema.IsFieldLoaded(field.GetFieldID()) && schema.CanRetrieveRawFieldData(field) {
|
|
||||||
resultFieldNameMap[fieldName] = true
|
resultFieldNameMap[fieldName] = true
|
||||||
userOutputFieldsMap[fieldName] = true
|
userOutputFieldsMap[fieldName] = true
|
||||||
}
|
}
|
||||||
@ -1529,49 +1524,40 @@ func translateOutputFields(outputFields []string, schema *schemaInfo, removePkFi
|
|||||||
if !schema.CanRetrieveRawFieldData(field) {
|
if !schema.CanRetrieveRawFieldData(field) {
|
||||||
return nil, nil, nil, false, fmt.Errorf("not allowed to retrieve raw data of field %s", outputFieldName)
|
return nil, nil, nil, false, fmt.Errorf("not allowed to retrieve raw data of field %s", outputFieldName)
|
||||||
}
|
}
|
||||||
if schema.IsFieldLoaded(field.GetFieldID()) {
|
resultFieldNameMap[outputFieldName] = true
|
||||||
resultFieldNameMap[outputFieldName] = true
|
userOutputFieldsMap[outputFieldName] = true
|
||||||
userOutputFieldsMap[outputFieldName] = true
|
|
||||||
} else {
|
|
||||||
return nil, nil, nil, false, fmt.Errorf("field %s is not loaded", outputFieldName)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if schema.EnableDynamicField {
|
if schema.EnableDynamicField {
|
||||||
if schema.IsFieldLoaded(dynamicField.GetFieldID()) {
|
dynamicNestedPath := outputFieldName
|
||||||
dynamicNestedPath := outputFieldName
|
err := planparserv2.ParseIdentifier(schema.schemaHelper, outputFieldName, func(expr *planpb.Expr) error {
|
||||||
err := planparserv2.ParseIdentifier(schema.schemaHelper, outputFieldName, func(expr *planpb.Expr) error {
|
columnInfo := expr.GetColumnExpr().GetInfo()
|
||||||
columnInfo := expr.GetColumnExpr().GetInfo()
|
// there must be no error here
|
||||||
// there must be no error here
|
dynamicField, _ := schema.schemaHelper.GetDynamicField()
|
||||||
dynamicField, _ := schema.schemaHelper.GetDynamicField()
|
// only $meta["xxx"] is allowed for now
|
||||||
// only $meta["xxx"] is allowed for now
|
if dynamicField.GetFieldID() != columnInfo.GetFieldId() {
|
||||||
if dynamicField.GetFieldID() != columnInfo.GetFieldId() {
|
return errors.New("not support getting subkeys of json field yet")
|
||||||
return errors.New("not support getting subkeys of json field yet")
|
|
||||||
}
|
|
||||||
nestedPaths := columnInfo.GetNestedPath()
|
|
||||||
// $meta["A"]["B"] not allowed for now
|
|
||||||
if len(nestedPaths) != 1 {
|
|
||||||
return errors.New("not support getting multiple level of dynamic field for now")
|
|
||||||
}
|
|
||||||
// $meta["dyn_field"], output field name could be:
|
|
||||||
// 1. "dyn_field", outputFieldName == nestedPath
|
|
||||||
// 2. `$meta["dyn_field"]` explicit form
|
|
||||||
if nestedPaths[0] != outputFieldName {
|
|
||||||
// use "dyn_field" as userDynamicFieldsMap when outputField = `$meta["dyn_field"]`
|
|
||||||
dynamicNestedPath = nestedPaths[0]
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Info("parse output field name failed", zap.String("field name", outputFieldName), zap.Error(err))
|
|
||||||
return nil, nil, nil, false, fmt.Errorf("parse output field name failed: %s", outputFieldName)
|
|
||||||
}
|
}
|
||||||
resultFieldNameMap[common.MetaFieldName] = true
|
nestedPaths := columnInfo.GetNestedPath()
|
||||||
userOutputFieldsMap[outputFieldName] = true
|
// $meta["A"]["B"] not allowed for now
|
||||||
userDynamicFieldsMap[dynamicNestedPath] = true
|
if len(nestedPaths) != 1 {
|
||||||
} else {
|
return errors.New("not support getting multiple level of dynamic field for now")
|
||||||
// TODO after cold field be able to fetched with chunk cache, this check shall be removed
|
}
|
||||||
return nil, nil, nil, false, fmt.Errorf("field %s cannot be returned since dynamic field not loaded", outputFieldName)
|
// $meta["dyn_field"], output field name could be:
|
||||||
|
// 1. "dyn_field", outputFieldName == nestedPath
|
||||||
|
// 2. `$meta["dyn_field"]` explicit form
|
||||||
|
if nestedPaths[0] != outputFieldName {
|
||||||
|
// use "dyn_field" as userDynamicFieldsMap when outputField = `$meta["dyn_field"]`
|
||||||
|
dynamicNestedPath = nestedPaths[0]
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Info("parse output field name failed", zap.String("field name", outputFieldName), zap.Error(err))
|
||||||
|
return nil, nil, nil, false, fmt.Errorf("parse output field name failed: %s", outputFieldName)
|
||||||
}
|
}
|
||||||
|
resultFieldNameMap[common.MetaFieldName] = true
|
||||||
|
userOutputFieldsMap[outputFieldName] = true
|
||||||
|
userDynamicFieldsMap[dynamicNestedPath] = true
|
||||||
} else {
|
} else {
|
||||||
return nil, nil, nil, false, fmt.Errorf("field %s not exist", outputFieldName)
|
return nil, nil, nil, false, fmt.Errorf("field %s not exist", outputFieldName)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -275,10 +275,10 @@ type SchemaHelper struct {
|
|||||||
partitionKeyOffset int
|
partitionKeyOffset int
|
||||||
clusteringKeyOffset int
|
clusteringKeyOffset int
|
||||||
dynamicFieldOffset int
|
dynamicFieldOffset int
|
||||||
loadFields Set[int64]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateSchemaHelperWithLoadFields(schema *schemapb.CollectionSchema, loadFields []int64) (*SchemaHelper, error) {
|
// CreateSchemaHelper returns a new SchemaHelper object
|
||||||
|
func CreateSchemaHelper(schema *schemapb.CollectionSchema) (*SchemaHelper, error) {
|
||||||
if schema == nil {
|
if schema == nil {
|
||||||
return nil, errors.New("schema is nil")
|
return nil, errors.New("schema is nil")
|
||||||
}
|
}
|
||||||
@ -290,7 +290,6 @@ func CreateSchemaHelperWithLoadFields(schema *schemapb.CollectionSchema, loadFie
|
|||||||
partitionKeyOffset: -1,
|
partitionKeyOffset: -1,
|
||||||
clusteringKeyOffset: -1,
|
clusteringKeyOffset: -1,
|
||||||
dynamicFieldOffset: -1,
|
dynamicFieldOffset: -1,
|
||||||
loadFields: NewSet(loadFields...),
|
|
||||||
}
|
}
|
||||||
for offset, field := range schema.Fields {
|
for offset, field := range schema.Fields {
|
||||||
if _, ok := schemaHelper.nameOffset[field.Name]; ok {
|
if _, ok := schemaHelper.nameOffset[field.Name]; ok {
|
||||||
@ -332,11 +331,6 @@ func CreateSchemaHelperWithLoadFields(schema *schemapb.CollectionSchema, loadFie
|
|||||||
return &schemaHelper, nil
|
return &schemaHelper, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateSchemaHelper returns a new SchemaHelper object
|
|
||||||
func CreateSchemaHelper(schema *schemapb.CollectionSchema) (*SchemaHelper, error) {
|
|
||||||
return CreateSchemaHelperWithLoadFields(schema, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPrimaryKeyField returns the schema of the primary key
|
// GetPrimaryKeyField returns the schema of the primary key
|
||||||
func (helper *SchemaHelper) GetPrimaryKeyField() (*schemapb.FieldSchema, error) {
|
func (helper *SchemaHelper) GetPrimaryKeyField() (*schemapb.FieldSchema, error) {
|
||||||
if helper.primaryKeyOffset == -1 {
|
if helper.primaryKeyOffset == -1 {
|
||||||
@ -387,21 +381,9 @@ func (helper *SchemaHelper) GetFieldFromNameDefaultJSON(fieldName string) (*sche
|
|||||||
return helper.getDefaultJSONField(fieldName)
|
return helper.getDefaultJSONField(fieldName)
|
||||||
}
|
}
|
||||||
fieldSchema := helper.schema.Fields[offset]
|
fieldSchema := helper.schema.Fields[offset]
|
||||||
if !helper.IsFieldLoaded(fieldSchema.GetFieldID()) {
|
|
||||||
return nil, errors.Newf("field %s is not loaded", fieldSchema)
|
|
||||||
}
|
|
||||||
return fieldSchema, nil
|
return fieldSchema, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFieldFromNameDefaultJSON returns whether is field loaded.
|
|
||||||
// If load fields is not provided, treated as loaded
|
|
||||||
func (helper *SchemaHelper) IsFieldLoaded(fieldID int64) bool {
|
|
||||||
if len(helper.loadFields) == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return helper.loadFields.Contain(fieldID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (helper *SchemaHelper) IsFieldTextMatchEnabled(fieldId int64) bool {
|
func (helper *SchemaHelper) IsFieldTextMatchEnabled(fieldId int64) bool {
|
||||||
sche, err := helper.GetFieldFromID(fieldId)
|
sche, err := helper.GetFieldFromID(fieldId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -413,9 +395,6 @@ func (helper *SchemaHelper) IsFieldTextMatchEnabled(fieldId int64) bool {
|
|||||||
func (helper *SchemaHelper) getDefaultJSONField(fieldName string) (*schemapb.FieldSchema, error) {
|
func (helper *SchemaHelper) getDefaultJSONField(fieldName string) (*schemapb.FieldSchema, error) {
|
||||||
for _, f := range helper.schema.GetFields() {
|
for _, f := range helper.schema.GetFields() {
|
||||||
if f.DataType == schemapb.DataType_JSON && f.IsDynamic {
|
if f.DataType == schemapb.DataType_JSON && f.IsDynamic {
|
||||||
if !helper.IsFieldLoaded(f.GetFieldID()) {
|
|
||||||
return nil, errors.Newf("field %s is dynamic but dynamic field is not loaded", fieldName)
|
|
||||||
}
|
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user