fix: Check field mmap property before apply collection level one (#43090)

Related to #43089

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-07-03 14:30:44 +08:00 committed by GitHub
parent 6e38e9d18f
commit 1fae5230fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 12 deletions

View File

@ -420,6 +420,7 @@ func (ex *Executor) subscribeChannel(task *ChannelTask, step int) error {
task,
action,
collectionInfo.GetSchema(),
collectionInfo.GetProperties(),
loadMeta,
dmChannel,
indexInfo,

View File

@ -102,7 +102,7 @@ func GetTaskType(task Task) Type {
return 0
}
func mergeCollectonProps(schemaProps []*commonpb.KeyValuePair, collectionProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
func mergeCollectionProps(schemaProps []*commonpb.KeyValuePair, collectionProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
// Merge the collectionProps and schemaProps maps, giving priority to the values in schemaProps if there are duplicate keys.
props := make(map[string]string)
for _, p := range collectionProps {
@ -142,18 +142,8 @@ func packLoadSegmentRequest(
if task.Source() == utils.LeaderChecker {
loadScope = querypb.LoadScope_Delta
}
// field mmap enabled if collection-level mmap enabled or the field mmap enabled
collectionMmapEnabled, exist := common.IsMmapDataEnabled(collectionProperties...)
for _, field := range schema.GetFields() {
if exist {
field.TypeParams = append(field.TypeParams, &commonpb.KeyValuePair{
Key: common.MmapEnabledKey,
Value: strconv.FormatBool(collectionMmapEnabled),
})
}
}
schema.Properties = mergeCollectonProps(schema.Properties, collectionProperties)
schema = applyCollectionMmapSetting(schema, collectionProperties)
return &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(
@ -206,12 +196,14 @@ func packSubChannelRequest(
task *ChannelTask,
action Action,
schema *schemapb.CollectionSchema,
collectionProperties []*commonpb.KeyValuePair,
loadMeta *querypb.LoadMetaInfo,
channel *meta.DmChannel,
indexInfo []*indexpb.IndexInfo,
partitions []int64,
targetVersion int64,
) *querypb.WatchDmChannelsRequest {
schema = applyCollectionMmapSetting(schema, collectionProperties)
return &querypb.WatchDmChannelsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_WatchDmChannels),
@ -272,3 +264,23 @@ func packUnsubDmChannelRequest(task *ChannelTask, action Action) *querypb.UnsubD
ChannelName: task.Channel(),
}
}
func applyCollectionMmapSetting(schema *schemapb.CollectionSchema,
collectionProperties []*commonpb.KeyValuePair,
) *schemapb.CollectionSchema {
schema = typeutil.Clone(schema)
schema.Properties = mergeCollectionProps(schema.Properties, collectionProperties)
// field mmap enabled if collection-level mmap enabled or the field mmap enabled
collectionMmapEnabled, exist := common.IsMmapDataEnabled(collectionProperties...)
for _, field := range schema.GetFields() {
if exist &&
// field-level mmap setting has higher priority than collection-level mmap setting, skip if field-level mmap enabled
!common.FieldHasMmapKey(schema, field.GetFieldID()) {
field.TypeParams = append(field.TypeParams, &commonpb.KeyValuePair{
Key: common.MmapEnabledKey,
Value: strconv.FormatBool(collectionMmapEnabled),
})
}
}
return schema
}