mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: make the watch dm channel request better compatibility (#30952)
issue: #30938 Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
dd957cf9e3
commit
ee8d6f236c
@ -397,7 +397,9 @@ func (s *Server) startQueryCoord() error {
|
||||
return err
|
||||
}
|
||||
for _, node := range sessions {
|
||||
s.nodeMgr.Add(session.NewNodeInfo(node.ServerID, node.Address))
|
||||
n := session.NewNodeInfo(node.ServerID, node.Address)
|
||||
n.SetVersion(node.Version)
|
||||
s.nodeMgr.Add(n)
|
||||
s.taskScheduler.AddExecutor(node.ServerID)
|
||||
|
||||
if node.Stopping {
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/blang/semver/v4"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
@ -108,6 +109,7 @@ type NodeInfo struct {
|
||||
addr string
|
||||
state State
|
||||
lastHeartbeat *atomic.Int64
|
||||
version semver.Version
|
||||
}
|
||||
|
||||
func (n *NodeInfo) ID() int64 {
|
||||
@ -158,6 +160,14 @@ func (n *NodeInfo) UpdateStats(opts ...StatsOption) {
|
||||
n.mu.Unlock()
|
||||
}
|
||||
|
||||
func (n *NodeInfo) SetVersion(v semver.Version) {
|
||||
n.version = v
|
||||
}
|
||||
|
||||
func (n *NodeInfo) Version() semver.Version {
|
||||
return n.version
|
||||
}
|
||||
|
||||
func NewNodeInfo(id int64, addr string) *NodeInfo {
|
||||
return &NodeInfo{
|
||||
stats: newStats(),
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/blang/semver/v4"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
@ -43,6 +44,13 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
// segmentsVersion is used for the flushed segments should not be included in the watch dm channel request
|
||||
var segmentsVersion = semver.Version{
|
||||
Major: 2,
|
||||
Minor: 3,
|
||||
Patch: 4,
|
||||
}
|
||||
|
||||
type Executor struct {
|
||||
doneCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
@ -329,7 +337,7 @@ func (ex *Executor) subscribeChannel(task *ChannelTask, step int) error {
|
||||
dmChannel,
|
||||
indexInfo,
|
||||
)
|
||||
err = fillSubChannelRequest(ctx, req, ex.broker)
|
||||
err = fillSubChannelRequest(ctx, req, ex.broker, ex.shouldIncludeFlushedSegmentInfo(action.Node()))
|
||||
if err != nil {
|
||||
log.Warn("failed to subscribe channel, failed to fill the request with segments",
|
||||
zap.Error(err))
|
||||
@ -356,6 +364,14 @@ func (ex *Executor) subscribeChannel(task *ChannelTask, step int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ex *Executor) shouldIncludeFlushedSegmentInfo(nodeID int64) bool {
|
||||
node := ex.nodeMgr.Get(nodeID)
|
||||
if node == nil {
|
||||
return false
|
||||
}
|
||||
return node.Version().LT(segmentsVersion)
|
||||
}
|
||||
|
||||
func (ex *Executor) unsubscribeChannel(task *ChannelTask, step int) error {
|
||||
defer ex.removeTask(task, step)
|
||||
startTs := time.Now()
|
||||
|
||||
@ -195,9 +195,13 @@ func fillSubChannelRequest(
|
||||
ctx context.Context,
|
||||
req *querypb.WatchDmChannelsRequest,
|
||||
broker meta.Broker,
|
||||
includeFlushed bool,
|
||||
) error {
|
||||
segmentIDs := typeutil.NewUniqueSet()
|
||||
for _, vchannel := range req.GetInfos() {
|
||||
if includeFlushed {
|
||||
segmentIDs.Insert(vchannel.GetFlushedSegmentIds()...)
|
||||
}
|
||||
segmentIDs.Insert(vchannel.GetUnflushedSegmentIds()...)
|
||||
segmentIDs.Insert(vchannel.GetLevelZeroSegmentIds()...)
|
||||
}
|
||||
|
||||
@ -19,13 +19,11 @@ package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
@ -66,17 +64,11 @@ func (w *remoteWorker) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
|
||||
zap.Int64("workerID", req.GetDstNodeID()),
|
||||
)
|
||||
status, err := w.client.LoadSegments(ctx, req)
|
||||
if err != nil {
|
||||
if err = merr.CheckRPCCall(status, err); err != nil {
|
||||
log.Warn("failed to call LoadSegments via grpc worker",
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
} else if status.GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Warn("failed to call LoadSegments, worker return error",
|
||||
zap.String("errorCode", status.GetErrorCode().String()),
|
||||
zap.String("reason", status.GetReason()),
|
||||
)
|
||||
return fmt.Errorf(status.Reason)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -86,17 +78,11 @@ func (w *remoteWorker) ReleaseSegments(ctx context.Context, req *querypb.Release
|
||||
zap.Int64("workerID", req.GetNodeID()),
|
||||
)
|
||||
status, err := w.client.ReleaseSegments(ctx, req)
|
||||
if err != nil {
|
||||
if err = merr.CheckRPCCall(status, err); err != nil {
|
||||
log.Warn("failed to call ReleaseSegments via grpc worker",
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
} else if status.GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Warn("failed to call ReleaseSegments, worker return error",
|
||||
zap.String("errorCode", status.GetErrorCode().String()),
|
||||
zap.String("reason", status.GetReason()),
|
||||
)
|
||||
return fmt.Errorf(status.Reason)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user