diff --git a/internal/proxy/task_policies.go b/internal/proxy/task_policies.go index 0c4bece237..f420f1c145 100644 --- a/internal/proxy/task_policies.go +++ b/internal/proxy/task_policies.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "github.com/milvus-io/milvus/internal/log" @@ -31,6 +32,19 @@ func updateShardsWithRoundRobin(shardsLeaders map[string][]nodeInfo) { } } +// mergeErrSet merges all errors in ErrSet +func mergeErrSet(errSet map[string]error) error { + var builder strings.Builder + for channel, err := range errSet { + if err == nil { + continue + } + + builder.WriteString(fmt.Sprintf("Channel: %s returns err: %s", channel, err.Error())) + } + return errors.New(builder.String()) +} + // group dml shard leader with same nodeID func groupShardleadersWithSameQueryNode( ctx context.Context, @@ -43,8 +57,8 @@ func groupShardleadersWithSameQueryNode( log.Ctx(ctx).Warn("no shard leaders were available", zap.String("channel", dml), zap.String("leaders", fmt.Sprintf("%v", shard2leaders[dml]))) - if e, ok := errSet[dml]; ok { - return nil, nil, e // return last error recorded + if _, ok := errSet[dml]; ok { + return nil, nil, mergeErrSet(errSet) // return merged last error recorded } return nil, nil, fmt.Errorf("no available shard leader") } diff --git a/internal/proxy/task_policies_test.go b/internal/proxy/task_policies_test.go index 66c5bf0497..9bf120ac95 100644 --- a/internal/proxy/task_policies_test.go +++ b/internal/proxy/task_policies_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "strings" "sync" "testing" @@ -109,7 +110,7 @@ func TestGroupShardLeadersWithSameQueryNode(t *testing.T) { nexts["c0"] = 3 _, _, err = groupShardleadersWithSameQueryNode(ctx, shard2leaders, nexts, errSet, mgr) - assert.Equal(t, err, errSet["c0"]) + assert.True(t, strings.Contains(err.Error(), errSet["c0"].Error())) nexts["c0"] = 2 nexts["c1"] = 3 @@ -155,7 +156,7 @@ func TestMergeRoundRobinPolicy(t *testing.T) { querier.failset[2] = mockerr querier.failset[3] = mockerr err = mergeRoundRobinPolicy(ctx, mgr, querier.query, shard2leaders) - assert.Equal(t, err, mockerr) + assert.True(t, strings.Contains(err.Error(), mockerr.Error())) } func mockQueryNodeCreator(ctx context.Context, address string) (types.QueryNode, error) { diff --git a/internal/proxy/task_query_test.go b/internal/proxy/task_query_test.go index fff72ae487..707c8bd657 100644 --- a/internal/proxy/task_query_test.go +++ b/internal/proxy/task_query_test.go @@ -3,6 +3,7 @@ package proxy import ( "context" "fmt" + "strings" "testing" "time" @@ -183,7 +184,8 @@ func TestQueryTask_all(t *testing.T) { ErrorCode: commonpb.ErrorCode_NotShardLeader, }, } - assert.Equal(t, task.Execute(ctx), errInvalidShardLeaders) + err = task.Execute(ctx) + assert.True(t, strings.Contains(err.Error(), errInvalidShardLeaders.Error())) qn.withQueryResult = &internalpb.RetrieveResults{ Status: &commonpb.Status{ diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index c7c6c81b61..5eb94c9f4a 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strconv" + "strings" "testing" "time" @@ -1771,7 +1772,8 @@ func TestSearchTask_ErrExecute(t *testing.T) { ErrorCode: commonpb.ErrorCode_NotShardLeader, }, } - assert.Equal(t, task.Execute(ctx), errInvalidShardLeaders) + err = task.Execute(ctx) + assert.True(t, strings.Contains(err.Error(), errInvalidShardLeaders.Error())) qn.withSearchResult = &internalpb.SearchResults{ Status: &commonpb.Status{