mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Make querycoord channel allocator respect context (#17552)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
0e29c37499
commit
cc3ecc4bd5
@ -20,7 +20,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -61,7 +60,10 @@ func shuffleChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChan
|
||||
if !wait {
|
||||
return err
|
||||
}
|
||||
time.Sleep(shuffleWaitInterval)
|
||||
err = waitWithContext(ctx, shuffleWaitInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
@ -100,6 +102,10 @@ func shuffleChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChan
|
||||
log.Error("shuffleChannelsToQueryNode failed", zap.Int64s("online nodeIDs", onlineNodeIDs), zap.Int64s("exclude nodeIDs", excludeNodeIDs), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
time.Sleep(shuffleWaitInterval)
|
||||
|
||||
err := waitWithContext(ctx, shuffleWaitInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ package querycoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@ -95,6 +96,28 @@ func TestShuffleChannelsToQueryNode(t *testing.T) {
|
||||
|
||||
assert.Equal(t, nodeID, firstReq.NodeID)
|
||||
assert.Equal(t, nodeID, secondReq.NodeID)
|
||||
t.Run("shuffeChannelsToQueryNode no online node ctx done", func(t *testing.T) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
err = shuffleChannelsToQueryNode(ctx, reqs, cluster, meta, true, []int64{nodeID}, nil, -1)
|
||||
assert.Error(t, err)
|
||||
|
||||
assert.True(t, errors.Is(err, context.Canceled))
|
||||
})
|
||||
|
||||
t.Run("shuffeChannelsToQueryNode no online node ctx done", func(t *testing.T) {
|
||||
cluster.StopNode(nodeID)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
err = shuffleChannelsToQueryNode(ctx, reqs, cluster, meta, true, nil, nil, -1)
|
||||
assert.Error(t, err)
|
||||
|
||||
assert.True(t, errors.Is(err, context.Canceled))
|
||||
})
|
||||
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user