milvus/internal/proxy/task_policies.go
congqixia 6c34386ff2
enhance: extract shard client logic into dedicated package (#45018)
Related to #44761

Refactor proxy shard client management by creating a new
internal/proxy/shardclient package. This improves code organization and
modularity by:

- Moving load balancing logic (LookAsideBalancer, RoundRobinBalancer) to
shardclient package
- Extracting shard client manager and related interfaces into separate
package
- Relocating shard leader management and client lifecycle code
- Adding package documentation (README.md, OWNERS)
- Updating proxy code to use the new shardclient package interfaces

This change makes the shard client functionality more maintainable and
better encapsulated, reducing coupling in the proxy layer.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2025-10-22 10:22:04 +08:00

71 lines
2.0 KiB
Go

package proxy
/*
import (
"context"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus/internal/proxy/shardclient"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
// type pickShardPolicy func(ctx context.Context, mgr shardClientMgr, query func(UniqueID, types.QueryNode) error, leaders []nodeInfo) error
type queryFunc func(context.Context, UniqueID, types.QueryNodeClient, ...string) error
type pickShardPolicy func(context.Context, shardclient.ShardClientMgr, queryFunc, map[string][]nodeInfo) error
var errInvalidShardLeaders = errors.New("Invalid shard leader")
// RoundRobinPolicy do the query with multiple dml channels
// if request failed, it finds shard leader for failed dml channels
func RoundRobinPolicy(
ctx context.Context,
mgr shardclient.ShardClientMgr,
query queryFunc,
dml2leaders map[string][]nodeInfo,
) error {
queryChannel := func(ctx context.Context, channel string) error {
var combineErr error
leaders := dml2leaders[channel]
for _, target := range leaders {
qn, err := mgr.GetClient(ctx, target)
if err != nil {
log.Ctx(ctx).Warn("query channel failed, node not available", zap.String("channel", channel), zap.Int64("nodeID", target.nodeID), zap.Error(err))
combineErr = merr.Combine(combineErr, err)
continue
}
err = query(ctx, target.nodeID, qn, channel)
if err != nil {
log.Ctx(ctx).Warn("query channel failed", zap.String("channel", channel), zap.Int64("nodeID", target.nodeID), zap.Error(err))
combineErr = merr.Combine(combineErr, err)
continue
}
return nil
}
log.Ctx(ctx).Error("failed to do query on all shard leader",
zap.String("channel", channel), zap.Error(combineErr))
return combineErr
}
wg, ctx := errgroup.WithContext(ctx)
for channel := range dml2leaders {
channel := channel
wg.Go(func() error {
err := queryChannel(ctx, channel)
return err
})
}
err := wg.Wait()
return err
}
*/