mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: datacoord stop get stuck After upgrading from 2.5 to 2.6 (#42674)
datacoord stop get stuck After upgrading from 2.5 to 2.6 issue:https://github.com/milvus-io/milvus/issues/42656 Signed-off-by: Xianhui.Lin <xianhui.lin@zilliz.com>
This commit is contained in:
parent
911a8df17c
commit
98067f5fc6
@ -98,7 +98,7 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
|
||||
views := make([]CompactionView, 0)
|
||||
for _, group := range partSegments {
|
||||
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
|
||||
group.segments = FilterInIndexedSegments(policy.handler, policy.meta, false, group.segments...)
|
||||
group.segments = FilterInIndexedSegments(ctx, policy.handler, policy.meta, false, group.segments...)
|
||||
}
|
||||
|
||||
collectionTTL, err := getCollectionTTL(collection.Properties)
|
||||
|
||||
@ -356,7 +356,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
|
||||
}
|
||||
|
||||
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
|
||||
group.segments = FilterInIndexedSegments(t.handler, t.meta, signal.isForce, group.segments...)
|
||||
group.segments = FilterInIndexedSegments(context.Background(), t.handler, t.meta, signal.isForce, group.segments...)
|
||||
}
|
||||
|
||||
coll, err := t.getCollection(group.collectionID)
|
||||
|
||||
@ -442,7 +442,10 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
|
||||
droppedCompactTo[to] = struct{}{}
|
||||
}
|
||||
}
|
||||
indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, false, lo.Keys(droppedCompactTo)...)
|
||||
indexedSegments := FilterInIndexedSegments(ctx, gc.handler, gc.meta, false, lo.Keys(droppedCompactTo)...)
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
indexedSet := make(typeutil.UniqueSet)
|
||||
for _, segment := range indexedSegments {
|
||||
indexedSet.Insert(segment.GetID())
|
||||
@ -525,9 +528,13 @@ func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
|
||||
|
||||
_, ok := collectionID2GcStatus[collectionID]
|
||||
if !ok {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
if ctx.Err() != nil {
|
||||
// process canceled, stop.
|
||||
return
|
||||
}
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
has, err := gc.option.broker.HasCollection(ctx, collectionID)
|
||||
has, err := gc.option.broker.HasCollection(timeoutCtx, collectionID)
|
||||
if err == nil && !has {
|
||||
collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1)
|
||||
} else {
|
||||
|
||||
@ -142,7 +142,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
|
||||
segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName())
|
||||
|
||||
validSegmentInfos := make(map[int64]*SegmentInfo)
|
||||
indexedSegments := FilterInIndexedSegments(h, h.s.meta, false, segments...)
|
||||
indexedSegments := FilterInIndexedSegments(context.Background(), h, h.s.meta, false, segments...)
|
||||
indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)
|
||||
|
||||
for _, s := range segments {
|
||||
|
||||
@ -72,11 +72,15 @@ func VerifyResponse(response interface{}, err error) error {
|
||||
}
|
||||
}
|
||||
|
||||
func FilterInIndexedSegments(handler Handler, mt *meta, skipNoIndexCollection bool, segments ...*SegmentInfo) []*SegmentInfo {
|
||||
func FilterInIndexedSegments(ctx context.Context, handler Handler, mt *meta, skipNoIndexCollection bool, segments ...*SegmentInfo) []*SegmentInfo {
|
||||
if len(segments) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
collectionSegments := lo.GroupBy(segments, func(segment *SegmentInfo) int64 {
|
||||
return segment.GetCollectionID()
|
||||
})
|
||||
@ -89,8 +93,9 @@ func FilterInIndexedSegments(handler Handler, mt *meta, skipNoIndexCollection bo
|
||||
continue
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
|
||||
coll, err := handler.GetCollection(ctx, collection)
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*2)
|
||||
|
||||
coll, err := handler.GetCollection(timeoutCtx, collection)
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Warn("failed to get collection schema", zap.Error(err))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user