From b562f8e64490fce03f808b889781bbd3933adde5 Mon Sep 17 00:00:00 2001 From: SimFG Date: Fri, 21 Feb 2025 10:45:53 +0800 Subject: [PATCH] fix: add filter to exclude L0 import jobs in compaction trigger (#40045) - issue: #39849 Signed-off-by: SimFG --- internal/datacoord/compaction_trigger_v2.go | 9 ++++++++- internal/datacoord/import_job.go | 7 +++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 4ebf3cbc76..c59bf27384 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -380,7 +380,11 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context, collection *collectionInfo, view CompactionView) error { // add l0 import task for the collection if the collection is importing - importJobs := m.imeta.GetJobBy(ctx, WithCollectionID(collection.ID), WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed)) + importJobs := m.imeta.GetJobBy(ctx, + WithCollectionID(collection.ID), + WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed), + WithoutL0Job(), + ) if len(importJobs) > 0 { partitionID := view.GetGroupLabel().PartitionID var ( @@ -404,6 +408,9 @@ func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context, } } } + if len(importPaths) == 0 { + return nil + } for i, job := range importJobs { newTasks, err := NewImportTasks([][]*datapb.ImportFileStats{ diff --git a/internal/datacoord/import_job.go b/internal/datacoord/import_job.go index 20d697345e..c94141617f 100644 --- a/internal/datacoord/import_job.go +++ b/internal/datacoord/import_job.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/internalpb" @@ -61,6 +62,12 @@ func WithoutJobStates(states ...internalpb.ImportJobState) ImportJobFilter { } } +func WithoutL0Job() ImportJobFilter { + return func(job ImportJob) bool { + return !importutilv2.IsL0Import(job.GetOptions()) + } +} + type UpdateJobAction func(job ImportJob) func UpdateJobState(state internalpb.ImportJobState) UpdateJobAction {