feat: Auto add namespace field data if namespace is enabled (#44933)

issue: #44011

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2025-10-24 18:40:05 +08:00 committed by GitHub
parent b069eeecd2
commit 58277c8eb0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1882 additions and 1780 deletions

View File

@ -55,15 +55,22 @@ func (policy *clusteringCompactionPolicy) Trigger(ctx context.Context) (map[Comp
events := make(map[CompactionTriggerType][]CompactionView, 0)
views := make([]CompactionView, 0)
partitionKeySortViews := make([]CompactionView, 0)
for _, collection := range collections {
collectionViews, _, err := policy.triggerOneCollection(ctx, collection.ID, false)
if err != nil {
// not throw this error because no need to fail because of one collection
log.Warn("fail to trigger collection clustering compaction", zap.Int64("collectionID", collection.ID), zap.Error(err))
}
views = append(views, collectionViews...)
isPartitionKeySorted := IsPartitionKeySortCompactionEnabled(collection.Properties)
if isPartitionKeySorted {
partitionKeySortViews = append(partitionKeySortViews, collectionViews...)
} else {
views = append(views, collectionViews...)
}
}
events[TriggerTypeClustering] = views
events[TriggerTypeClusteringPartitionKeySort] = partitionKeySortViews
return events, nil
}
@ -120,12 +127,14 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
}
partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool {
isPartitionKeySorted := IsPartitionKeySortCompactionEnabled(collection.Properties)
return isSegmentHealthy(segment) &&
isFlushed(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
!segment.GetIsInvisible()
!segment.GetIsInvisible() &&
(!isPartitionKeySorted || segment.IsPartitionKeySorted)
}))
views := make([]CompactionView, 0)

View File

@ -25,8 +25,10 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
// singleCompactionPolicy is to compact one segment with too many delta logs
@ -52,6 +54,7 @@ func (policy *singleCompactionPolicy) Trigger(ctx context.Context) (map[Compacti
events := make(map[CompactionTriggerType][]CompactionView, 0)
views := make([]CompactionView, 0)
sortViews := make([]CompactionView, 0)
partitionKeySortViews := make([]CompactionView, 0)
for _, collection := range collections {
collectionViews, collectionSortViews, _, err := policy.triggerOneCollection(ctx, collection.ID, false)
if err != nil {
@ -59,10 +62,15 @@ func (policy *singleCompactionPolicy) Trigger(ctx context.Context) (map[Compacti
log.Warn("fail to trigger single compaction", zap.Int64("collectionID", collection.ID), zap.Error(err))
}
views = append(views, collectionViews...)
sortViews = append(sortViews, collectionSortViews...)
if IsPartitionKeySortCompactionEnabled(collection.Properties) {
partitionKeySortViews = append(partitionKeySortViews, collectionSortViews...)
} else {
sortViews = append(sortViews, collectionSortViews...)
}
}
events[TriggerTypeSingle] = views
events[TriggerTypeSort] = sortViews
events[TriggerTypePartitionKeySort] = partitionKeySortViews
return events, nil
}
@ -80,16 +88,6 @@ func (policy *singleCompactionPolicy) triggerSegmentSortCompaction(
log.Warn("fail to apply triggerSegmentSortCompaction, segment not healthy")
return nil
}
if !canTriggerSortCompaction(segment) {
log.Warn("fail to apply triggerSegmentSortCompaction",
zap.String("state", segment.GetState().String()),
zap.String("level", segment.GetLevel().String()),
zap.Bool("isSorted", segment.GetIsSorted()),
zap.Bool("isImporting", segment.GetIsImporting()),
zap.Bool("isCompacting", segment.isCompacting),
zap.Bool("isInvisible", segment.GetIsInvisible()))
return nil
}
collection, err := policy.handler.GetCollection(ctx, segment.GetCollectionID())
if err != nil {
@ -101,6 +99,18 @@ func (policy *singleCompactionPolicy) triggerSegmentSortCompaction(
log.Warn("fail to apply triggerSegmentSortCompaction, collection not exist")
return nil
}
isPartitionIsolationEnabled := IsPartitionKeySortCompactionEnabled(collection.Properties)
if !canTriggerSortCompaction(segment, isPartitionIsolationEnabled) {
log.Warn("fail to apply triggerSegmentSortCompaction",
zap.String("state", segment.GetState().String()),
zap.String("level", segment.GetLevel().String()),
zap.Bool("isSorted", segment.GetIsSorted()),
zap.Bool("isImporting", segment.GetIsImporting()),
zap.Bool("isCompacting", segment.isCompacting),
zap.Bool("isInvisible", segment.GetIsInvisible()))
return nil
}
collectionTTL, err := getCollectionTTL(collection.Properties)
if err != nil {
log.Warn("failed to apply triggerSegmentSortCompaction, get collection ttl failed")
@ -126,6 +136,11 @@ func (policy *singleCompactionPolicy) triggerSegmentSortCompaction(
return view
}
func IsPartitionKeySortCompactionEnabled(properties map[string]string) bool {
iso, _ := common.IsPartitionKeyIsolationPropEnabled(properties)
return Params.CommonCfg.EnableNamespace.GetAsBool() && iso
}
func (policy *singleCompactionPolicy) triggerSortCompaction(
ctx context.Context,
triggerID int64,
@ -139,9 +154,20 @@ func (policy *singleCompactionPolicy) triggerSortCompaction(
}
views := make([]CompactionView, 0)
collection, err := policy.handler.GetCollection(ctx, collectionID)
if err != nil {
log.Warn("fail to apply triggerSegmentSortCompaction, unable to get collection from handler",
zap.Error(err))
return nil, err
}
if collection == nil {
log.Warn("fail to apply triggerSegmentSortCompaction, collection not exist")
return nil, merr.WrapErrCollectionNotFound(collectionID)
}
isPartitionIsolationEnabled := IsPartitionKeySortCompactionEnabled(collection.Properties)
triggerableSegments := policy.meta.SelectSegments(ctx, WithCollection(collectionID),
SegmentFilterFunc(func(seg *SegmentInfo) bool {
return canTriggerSortCompaction(seg)
return canTriggerSortCompaction(seg, isPartitionIsolationEnabled)
}))
if len(triggerableSegments) == 0 {
log.RatedInfo(20, "no triggerable segments")

View File

@ -814,10 +814,10 @@ func getExpandedSize(size int64) int64 {
return int64(float64(size) * Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat())
}
func canTriggerSortCompaction(segment *SegmentInfo) bool {
func canTriggerSortCompaction(segment *SegmentInfo, isPartitionIsolationEnabled bool) bool {
return segment.GetState() == commonpb.SegmentState_Flushed &&
segment.GetLevel() != datapb.SegmentLevel_L0 &&
!segment.GetIsSorted() &&
(!segment.GetIsSorted() || (isPartitionIsolationEnabled && !segment.GetIsPartitionKeySorted())) &&
!segment.GetIsImporting() &&
!segment.isCompacting
}

View File

@ -43,6 +43,8 @@ const (
TriggerTypeClustering
TriggerTypeSingle
TriggerTypeSort
TriggerTypePartitionKeySort
TriggerTypeClusteringPartitionKeySort
)
func (t CompactionTriggerType) String() string {
@ -61,6 +63,10 @@ func (t CompactionTriggerType) String() string {
return "Single"
case TriggerTypeSort:
return "Sort"
case TriggerTypePartitionKeySort:
return "PartitionKeySort"
case TriggerTypeClusteringPartitionKeySort:
return "ClusteringPartitionKeySort"
default:
return ""
}
@ -289,7 +295,17 @@ func (m *CompactionTriggerManager) loop(ctx context.Context) {
log.Warn("segment no need to do sort compaction", zap.Int64("segmentID", segID))
continue
}
m.notify(ctx, TriggerTypeSort, []CompactionView{view})
segment := m.meta.GetSegment(ctx, segID)
if segment == nil {
log.Warn("segment not found", zap.Int64("segmentID", segID))
continue
}
collection := m.meta.GetCollection(segment.GetCollectionID())
if !IsPartitionKeySortCompactionEnabled(collection.Properties) {
m.notify(ctx, TriggerTypeSort, []CompactionView{view})
} else {
m.notify(ctx, TriggerTypePartitionKeySort, []CompactionView{view})
}
}
}
}
@ -358,6 +374,10 @@ func (m *CompactionTriggerManager) notify(ctx context.Context, eventType Compact
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_MixCompaction)
case TriggerTypeSort:
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_SortCompaction)
case TriggerTypePartitionKeySort:
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_PartitionKeySortCompaction)
case TriggerTypeClusteringPartitionKeySort:
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_ClusteringPartitionKeySortCompaction)
}
}
}
@ -527,13 +547,17 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
log.Warn("pre-allocate result segments failed", zap.String("view", view.String()), zap.Error(err))
return
}
typ := datapb.CompactionType_ClusteringCompaction
if IsPartitionKeySortCompactionEnabled(collection.Properties) {
typ = datapb.CompactionType_MixCompaction
}
task := &datapb.CompactionTask{
PlanID: taskID,
TriggerID: view.(*ClusteringSegmentsView).triggerID,
State: datapb.CompactionTaskState_pipelining,
StartTime: time.Now().Unix(),
CollectionTtl: view.(*ClusteringSegmentsView).collectionTTL.Nanoseconds(),
Type: datapb.CompactionType_ClusteringCompaction,
Type: typ,
CollectionID: view.GetGroupLabel().CollectionID,
PartitionID: view.GetGroupLabel().PartitionID,
Channel: view.GetGroupLabel().Channel,

View File

@ -250,6 +250,24 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl
compactionParams,
[]int64{pk.GetFieldID()},
)
case datapb.CompactionType_PartitionKeySortCompaction:
if req.GetPreAllocatedSegmentIDs() == nil || req.GetPreAllocatedSegmentIDs().GetBegin() == 0 {
return merr.Status(merr.WrapErrParameterInvalidMsg("invalid pre-allocated segmentID range")), nil
}
pk, err := typeutil.GetPartitionKeyFieldSchema(req.GetSchema())
partitionkey, err := typeutil.GetPartitionKeyFieldSchema(req.GetSchema())
if err != nil {
return merr.Status(err), err
}
task = compactor.NewSortCompactionTask(
taskCtx,
binlogIO,
req,
compactionParams,
[]int64{partitionkey.GetFieldID(), pk.GetFieldID()},
)
case datapb.CompactionType_ClusteringPartitionKeySortCompaction:
//TODO
default:
log.Warn("Unknown compaction type", zap.String("type", req.GetType().String()))
return merr.Status(merr.WrapErrParameterInvalidMsg("Unknown compaction type: %v", req.GetType().String())), nil

View File

@ -417,6 +417,7 @@ message SegmentInfo {
// A segment generated by datacoord of old arch, will be false.
// After the growing segment is full managed by streamingnode, the true value can never be seen at coordinator.
bool is_created_by_streaming = 30;
bool is_partition_key_sorted = 31;
}
message SegmentStartPosition {
@ -628,6 +629,8 @@ enum CompactionType {
Level0DeleteCompaction = 7;
ClusteringCompaction = 8;
SortCompaction = 9;
PartitionKeySortCompaction = 10;
ClusteringPartitionKeySortCompaction = 11;
}
message CompactionStateRequest {

File diff suppressed because it is too large Load Diff