fix: add node id check to avoid double flush at most time (#41236)

issue: #41028

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-04-20 22:44:38 +08:00 committed by GitHub
parent ef4923e66b
commit c4a41cc32b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 111 additions and 1 deletions

View File

@ -17,6 +17,7 @@ import (
var StaticStreamingNodeManager = newStreamingNodeManager()
// TODO: can be removed after streaming service fully manage all growing data.
func newStreamingNodeManager() *StreamingNodeManager {
snm := &StreamingNodeManager{
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
@ -42,6 +43,21 @@ type StreamingNodeManager struct {
nodeChangedNotifier *syncutil.VersionedNotifier // used to notify that node in streaming node manager has been changed.
}
// GetLatestWALLocated returns the server id of the node that the wal of the vChannel is located.
// Return -1 and error if the vchannel is not found or context is canceled.
func (s *StreamingNodeManager) GetLatestWALLocated(ctx context.Context, vchannel string) (int64, error) {
pchannel := funcutil.ToPhysicalChannel(vchannel)
balancer, err := s.balancer.GetWithContext(ctx)
if err != nil {
return -1, err
}
serverID, ok := balancer.GetLatestWALLocated(ctx, pchannel)
if !ok {
return -1, errors.Errorf("channel: %s not found", vchannel)
}
return serverID, nil
}
// GetWALLocated returns the server id of the node that the wal of the vChannel is located.
func (s *StreamingNodeManager) GetWALLocated(vChannel string) int64 {
pchannel := funcutil.ToPhysicalChannel(vChannel)

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/componentutil"
@ -537,7 +538,20 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
// for compatibility issue , if len(channelName) not exist, skip the check
// Also avoid to handle segment not found error if not the owner of shard
if len(channelName) != 0 {
if !s.channelManager.Match(nodeID, channelName) {
// TODO: Current checker implementation is node id based, it cannot strictly promise when ABA channel assignment happens.
// Meanwhile the match operation is not protected with the global segment meta with the same lock,
// So the new recovery can happen with the old match operation concurrently, so it not safe enough to avoid double flush.
// Moreover, the Match operation may be called if the flusher is ready to work, but the channel manager on coord don't see the assignment success.
// So the match operation may be rejected and wait for retry.
// TODO: We need to make an idempotent operation to avoid the double flush strictly.
if streamingutil.IsStreamingServiceEnabled() {
targetID, err := snmanager.StaticStreamingNodeManager.GetLatestWALLocated(ctx, channelName)
if err != nil || targetID != nodeID {
err := merr.WrapErrChannelNotFound(channelName, fmt.Sprintf("for node %d", nodeID))
log.Warn("failed to get latest wall allocated", zap.Error(err))
return merr.Status(err), nil
}
} else if !s.channelManager.Match(nodeID, channelName) {
err := merr.WrapErrChannelNotFound(channelName, fmt.Sprintf("for node %d", nodeID))
log.Warn("node is not matched with channel", zap.String("channel", channelName), zap.Error(err))
return merr.Status(err), nil

View File

@ -56,6 +56,63 @@ func (_c *MockBalancer_Close_Call) RunAndReturn(run func()) *MockBalancer_Close_
return _c
}
// GetLatestWALLocated provides a mock function with given fields: ctx, pchannel
func (_m *MockBalancer) GetLatestWALLocated(ctx context.Context, pchannel string) (int64, bool) {
ret := _m.Called(ctx, pchannel)
if len(ret) == 0 {
panic("no return value specified for GetLatestWALLocated")
}
var r0 int64
var r1 bool
if rf, ok := ret.Get(0).(func(context.Context, string) (int64, bool)); ok {
return rf(ctx, pchannel)
}
if rf, ok := ret.Get(0).(func(context.Context, string) int64); ok {
r0 = rf(ctx, pchannel)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(context.Context, string) bool); ok {
r1 = rf(ctx, pchannel)
} else {
r1 = ret.Get(1).(bool)
}
return r0, r1
}
// MockBalancer_GetLatestWALLocated_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestWALLocated'
type MockBalancer_GetLatestWALLocated_Call struct {
*mock.Call
}
// GetLatestWALLocated is a helper method to define mock.On call
// - ctx context.Context
// - pchannel string
func (_e *MockBalancer_Expecter) GetLatestWALLocated(ctx interface{}, pchannel interface{}) *MockBalancer_GetLatestWALLocated_Call {
return &MockBalancer_GetLatestWALLocated_Call{Call: _e.mock.On("GetLatestWALLocated", ctx, pchannel)}
}
func (_c *MockBalancer_GetLatestWALLocated_Call) Run(run func(ctx context.Context, pchannel string)) *MockBalancer_GetLatestWALLocated_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *MockBalancer_GetLatestWALLocated_Call) Return(_a0 int64, _a1 bool) *MockBalancer_GetLatestWALLocated_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockBalancer_GetLatestWALLocated_Call) RunAndReturn(run func(context.Context, string) (int64, bool)) *MockBalancer_GetLatestWALLocated_Call {
_c.Call.Return(run)
return _c
}
// MarkAsUnavailable provides a mock function with given fields: ctx, pChannels
func (_m *MockBalancer) MarkAsUnavailable(ctx context.Context, pChannels []types.PChannelInfo) error {
ret := _m.Called(ctx, pChannels)

View File

@ -19,6 +19,9 @@ var (
// Balancer is a local component, it should promise all channel can be assigned, and reach the final consistency.
// Balancer should be thread safe.
type Balancer interface {
// GetLatestWALLocated returns the server id of the node that the wal of the vChannel is located.
GetLatestWALLocated(ctx context.Context, pchannel string) (int64, bool)
// WatchChannelAssignments watches the balance result.
WatchChannelAssignments(ctx context.Context, cb func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error) error

View File

@ -65,6 +65,11 @@ type balancerImpl struct {
backgroundTaskNotifier *syncutil.AsyncTaskNotifier[struct{}] // backgroundTaskNotifier is used to conmunicate with the background task.
}
// GetLatestWALLocated returns the server id of the node that the wal of the vChannel is located.
func (b *balancerImpl) GetLatestWALLocated(ctx context.Context, pchannel string) (int64, bool) {
return b.channelMetaManager.GetLatestWALLocated(ctx, pchannel)
}
// WatchChannelAssignments watches the balance result.
func (b *balancerImpl) WatchChannelAssignments(ctx context.Context, cb func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error) error {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {

View File

@ -196,6 +196,21 @@ func (cm *ChannelManager) updatePChannelMeta(ctx context.Context, pChannelMetas
return nil
}
// GetLatestWALLocated returns the server id of the node that the wal of the vChannel is located.
func (cm *ChannelManager) GetLatestWALLocated(ctx context.Context, pchannel string) (int64, bool) {
cm.cond.L.Lock()
defer cm.cond.L.Unlock()
pChannelMeta, ok := cm.channels[types.ChannelID{Name: pchannel}]
if !ok {
return 0, false
}
if pChannelMeta.IsAssigned() {
return pChannelMeta.CurrentServerID(), true
}
return 0, false
}
func (cm *ChannelManager) WatchAssignmentResult(ctx context.Context, cb func(version typeutil.VersionInt64Pair, assignments []types.PChannelInfoAssigned) error) error {
// push the first balance result to watcher callback function if balance result is ready.
version, err := cm.applyAssignments(cb)