From 98067f5fc62d3d0dded60341b191c7015b990756 Mon Sep 17 00:00:00 2001 From: Xianhui Lin <35839735+JsDove@users.noreply.github.com> Date: Thu, 12 Jun 2025 16:56:36 +0800 Subject: [PATCH] fix: datacoord stop get stuck After upgrading from 2.5 to 2.6 (#42674) datacoord stop get stuck After upgrading from 2.5 to 2.6 issue:https://github.com/milvus-io/milvus/issues/42656 Signed-off-by: Xianhui.Lin --- internal/datacoord/compaction_policy_single.go | 2 +- internal/datacoord/compaction_trigger.go | 2 +- internal/datacoord/garbage_collector.go | 13 ++++++++++--- internal/datacoord/handler.go | 2 +- internal/datacoord/util.go | 11 ++++++++--- 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/internal/datacoord/compaction_policy_single.go b/internal/datacoord/compaction_policy_single.go index a3fa6852e2..eb9bc85039 100644 --- a/internal/datacoord/compaction_policy_single.go +++ b/internal/datacoord/compaction_policy_single.go @@ -98,7 +98,7 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context, views := make([]CompactionView, 0) for _, group := range partSegments { if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() { - group.segments = FilterInIndexedSegments(policy.handler, policy.meta, false, group.segments...) + group.segments = FilterInIndexedSegments(ctx, policy.handler, policy.meta, false, group.segments...) } collectionTTL, err := getCollectionTTL(collection.Properties) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 714d96f251..5f1482d878 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -356,7 +356,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { } if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() { - group.segments = FilterInIndexedSegments(t.handler, t.meta, signal.isForce, group.segments...) + group.segments = FilterInIndexedSegments(context.Background(), t.handler, t.meta, signal.isForce, group.segments...) } coll, err := t.getCollection(group.collectionID) diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 9784cfa74b..82d05ab923 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -442,7 +442,10 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) { droppedCompactTo[to] = struct{}{} } } - indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, false, lo.Keys(droppedCompactTo)...) + indexedSegments := FilterInIndexedSegments(ctx, gc.handler, gc.meta, false, lo.Keys(droppedCompactTo)...) + if ctx.Err() != nil { + return + } indexedSet := make(typeutil.UniqueSet) for _, segment := range indexedSegments { indexedSet.Insert(segment.GetID()) @@ -525,9 +528,13 @@ func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) { _, ok := collectionID2GcStatus[collectionID] if !ok { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + if ctx.Err() != nil { + // process canceled, stop. + return + } + timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() - has, err := gc.option.broker.HasCollection(ctx, collectionID) + has, err := gc.option.broker.HasCollection(timeoutCtx, collectionID) if err == nil && !has { collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1) } else { diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 1a8b71580d..0975488f36 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -142,7 +142,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName()) validSegmentInfos := make(map[int64]*SegmentInfo) - indexedSegments := FilterInIndexedSegments(h, h.s.meta, false, segments...) + indexedSegments := FilterInIndexedSegments(context.Background(), h, h.s.meta, false, segments...) indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...) for _, s := range segments { diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 713821fe2a..e6f08ebf8e 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -72,11 +72,15 @@ func VerifyResponse(response interface{}, err error) error { } } -func FilterInIndexedSegments(handler Handler, mt *meta, skipNoIndexCollection bool, segments ...*SegmentInfo) []*SegmentInfo { +func FilterInIndexedSegments(ctx context.Context, handler Handler, mt *meta, skipNoIndexCollection bool, segments ...*SegmentInfo) []*SegmentInfo { if len(segments) == 0 { return nil } + if ctx.Err() != nil { + return nil + } + collectionSegments := lo.GroupBy(segments, func(segment *SegmentInfo) int64 { return segment.GetCollectionID() }) @@ -89,8 +93,9 @@ func FilterInIndexedSegments(handler Handler, mt *meta, skipNoIndexCollection bo continue } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - coll, err := handler.GetCollection(ctx, collection) + timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*2) + + coll, err := handler.GetCollection(timeoutCtx, collection) cancel() if err != nil { log.Warn("failed to get collection schema", zap.Error(err))