From cc3ecc4bd5f8f722c125675bcbd665eacc934d63 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 17 Jun 2022 16:02:12 +0800 Subject: [PATCH] Make querycoord channel allocator respect context (#17552) Signed-off-by: Congqi Xia --- internal/querycoord/channel_allocator.go | 12 +++++++--- internal/querycoord/channel_allocator_test.go | 23 +++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/internal/querycoord/channel_allocator.go b/internal/querycoord/channel_allocator.go index f5e91d58d0..48dc18947a 100644 --- a/internal/querycoord/channel_allocator.go +++ b/internal/querycoord/channel_allocator.go @@ -20,7 +20,6 @@ import ( "context" "errors" "sort" - "time" "go.uber.org/zap" @@ -61,7 +60,10 @@ func shuffleChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChan if !wait { return err } - time.Sleep(shuffleWaitInterval) + err = waitWithContext(ctx, shuffleWaitInterval) + if err != nil { + return err + } continue } @@ -100,6 +102,10 @@ func shuffleChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChan log.Error("shuffleChannelsToQueryNode failed", zap.Int64s("online nodeIDs", onlineNodeIDs), zap.Int64s("exclude nodeIDs", excludeNodeIDs), zap.Error(err)) return err } - time.Sleep(shuffleWaitInterval) + + err := waitWithContext(ctx, shuffleWaitInterval) + if err != nil { + return err + } } } diff --git a/internal/querycoord/channel_allocator_test.go b/internal/querycoord/channel_allocator_test.go index 675525e8e0..1904947701 100644 --- a/internal/querycoord/channel_allocator_test.go +++ b/internal/querycoord/channel_allocator_test.go @@ -18,6 +18,7 @@ package querycoord import ( "context" + "errors" "math/rand" "sync/atomic" "testing" @@ -95,6 +96,28 @@ func TestShuffleChannelsToQueryNode(t *testing.T) { assert.Equal(t, nodeID, firstReq.NodeID) assert.Equal(t, nodeID, secondReq.NodeID) + t.Run("shuffeChannelsToQueryNode no online node ctx done", func(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = shuffleChannelsToQueryNode(ctx, reqs, cluster, meta, true, []int64{nodeID}, nil, -1) + assert.Error(t, err) + + assert.True(t, errors.Is(err, context.Canceled)) + }) + + t.Run("shuffeChannelsToQueryNode no online node ctx done", func(t *testing.T) { + cluster.StopNode(nodeID) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = shuffleChannelsToQueryNode(ctx, reqs, cluster, meta, true, nil, nil, -1) + assert.Error(t, err) + + assert.True(t, errors.Is(err, context.Canceled)) + }) err = removeAllSession() assert.Nil(t, err)