milvus/internal/coordinator/snmanager/streaming_node_manager.go
Zhen Ye 78c479ce9b
fix: panic when streaming coord shutdown but query coord still work (#45696)
issue: #44984
pr: #45695

Signed-off-by: chyezh <chyezh@outlook.com>
2025-11-20 12:05:09 +08:00

200 lines
6.7 KiB
Go

package snmanager
import (
"context"
"sync"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/balance"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
var StaticStreamingNodeManager = newStreamingNodeManager()
var ErrStreamingServiceNotReady = errors.New("streaming service is not ready, may be on-upgrading from old arch")
// TODO: can be removed after streaming service fully manage all growing data.
func newStreamingNodeManager() *StreamingNodeManager {
snm := &StreamingNodeManager{
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
cond: syncutil.NewContextCond(&sync.Mutex{}),
latestAssignments: make(map[string]types.PChannelInfoAssigned),
nodeChangedNotifier: syncutil.NewVersionedNotifier(),
}
go snm.execute()
return snm
}
// NewStreamingReadyNotifier creates a new streaming ready notifier.
func NewStreamingReadyNotifier() *StreamingReadyNotifier {
return &StreamingReadyNotifier{
inner: syncutil.NewAsyncTaskNotifier[struct{}](),
}
}
// StreamingReadyNotifier is a notifier for streaming service ready.
type StreamingReadyNotifier struct {
inner *syncutil.AsyncTaskNotifier[struct{}]
}
// Release releases the notifier.
func (s *StreamingReadyNotifier) Release() {
s.inner.Finish(struct{}{})
}
// Ready returns a channel that will be closed when the streaming service is ready.
func (s *StreamingReadyNotifier) Ready() <-chan struct{} {
return s.inner.Context().Done()
}
// IsReady returns true if the streaming service is ready.
func (s *StreamingReadyNotifier) IsReady() bool {
return s.inner.Context().Err() != nil
}
// Context returns the context of the notifier.
// StreamingNodeManager is a manager for manage the querynode that embedded into streaming node.
// StreamingNodeManager is exclusive with ResourceManager.
type StreamingNodeManager struct {
notifier *syncutil.AsyncTaskNotifier[struct{}]
cond *syncutil.ContextCond
latestAssignments map[string]types.PChannelInfoAssigned // The latest assignments info got from streaming coord balance module.
nodeChangedNotifier *syncutil.VersionedNotifier // used to notify that node in streaming node manager has been changed.
previousNodeIDs typeutil.UniqueSet // used to store the previous node ids.
}
// GetBalancer returns the balancer of the streaming node manager.
func (s *StreamingNodeManager) GetBalancer() balancer.Balancer {
b, err := balance.GetWithContext(context.Background())
if err != nil {
panic(err)
}
return b
}
// AllocVirtualChannels allocates virtual channels for a collection.
func (s *StreamingNodeManager) AllocVirtualChannels(ctx context.Context, param balancer.AllocVChannelParam) ([]string, error) {
balancer, err := balance.GetWithContext(ctx)
if err != nil {
return nil, err
}
return balancer.AllocVirtualChannels(ctx, param)
}
// GetLatestWALLocated returns the server id of the node that the wal of the vChannel is located.
// Return -1 and error if the vchannel is not found or context is canceled.
func (s *StreamingNodeManager) GetLatestWALLocated(ctx context.Context, vchannel string) (int64, error) {
pchannel := funcutil.ToPhysicalChannel(vchannel)
balancer, err := balance.GetWithContext(ctx)
if err != nil {
return -1, err
}
serverID, ok := balancer.GetLatestWALLocated(ctx, pchannel)
if !ok {
return -1, errors.Errorf("channel: %s not found", vchannel)
}
return serverID, nil
}
// CheckIfStreamingServiceReady checks if the streaming service is ready.
func (s *StreamingNodeManager) CheckIfStreamingServiceReady(ctx context.Context) error {
n := NewStreamingReadyNotifier()
if err := s.RegisterStreamingEnabledListener(ctx, n); err != nil {
return err
}
defer n.Release()
if !n.IsReady() {
// The notifier is not canceled, so the streaming service is not ready.
return ErrStreamingServiceNotReady
}
return nil
}
// RegisterStreamingEnabledNotifier registers a notifier into the balancer.
func (s *StreamingNodeManager) RegisterStreamingEnabledListener(ctx context.Context, notifier *StreamingReadyNotifier) error {
balancer, err := balance.GetWithContext(ctx)
if err != nil {
return err
}
balancer.RegisterStreamingEnabledNotifier(notifier.inner)
return nil
}
// GetWALLocated returns the server id of the node that the wal of the vChannel is located.
func (s *StreamingNodeManager) GetWALLocated(vChannel string) int64 {
pchannel := funcutil.ToPhysicalChannel(vChannel)
var targetServerID int64
s.cond.L.Lock()
for {
if assignment, ok := s.latestAssignments[pchannel]; ok {
targetServerID = assignment.Node.ServerID
break
}
s.cond.Wait(context.Background())
}
s.cond.L.Unlock()
return targetServerID
}
// GetStreamingQueryNodeIDs returns the server ids of the streaming query nodes.
func (s *StreamingNodeManager) GetStreamingQueryNodeIDs() typeutil.UniqueSet {
balancer, err := balance.GetWithContext(context.Background())
if err != nil {
panic(err)
}
streamingNodes, err := balancer.GetAllStreamingNodes(context.Background())
if err != nil {
// when the streaming coord is on shutdown, the balancer will return an error,
// causing panic, so we need to return the previous node ids.
return s.previousNodeIDs
}
streamingNodeIDs := typeutil.NewUniqueSet()
for _, streamingNode := range streamingNodes {
streamingNodeIDs.Insert(streamingNode.ServerID)
}
s.previousNodeIDs = streamingNodeIDs
return streamingNodeIDs
}
// ListenNodeChanged returns a listener for node changed event.
func (s *StreamingNodeManager) ListenNodeChanged() *syncutil.VersionedListener {
return s.nodeChangedNotifier.Listen(syncutil.VersionedListenAtEarliest)
}
func (s *StreamingNodeManager) execute() (err error) {
defer s.notifier.Finish(struct{}{})
b, err := balance.GetWithContext(s.notifier.Context())
if err != nil {
return errors.Wrap(err, "failed to wait balancer ready")
}
for {
if err := b.WatchChannelAssignments(s.notifier.Context(), func(param balancer.WatchChannelAssignmentsCallbackParam) error {
s.cond.LockAndBroadcast()
s.latestAssignments = make(map[string]types.PChannelInfoAssigned)
for _, relation := range param.Relations {
s.latestAssignments[relation.Channel.Name] = relation
}
s.nodeChangedNotifier.NotifyAll()
log.Info("streaming node manager updated", zap.Any("assignments", s.latestAssignments))
s.cond.L.Unlock()
return nil
}); err != nil {
return err
}
}
}
func (s *StreamingNodeManager) Close() {
s.notifier.Cancel()
s.notifier.BlockUntilFinish()
}