diff --git a/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go b/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go index 69ae6db46c..fc5b8c5bf9 100644 --- a/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go +++ b/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go @@ -3,6 +3,7 @@ package broadcaster import ( "context" "sort" + "sync" "time" "github.com/cenkalti/backoff/v4" @@ -20,7 +21,8 @@ func newAckCallbackScheduler(logger *log.MLogger) *ackCallbackScheduler { notifier: syncutil.NewAsyncTaskNotifier[struct{}](), pending: make(chan *broadcastTask, 16), triggerChan: make(chan struct{}, 1), - rkLocker: newResourceKeyLocker(newBroadcasterMetrics()), + rkLockerMu: sync.Mutex{}, + rkLocker: newResourceKeyLocker(), tombstoneScheduler: newTombstoneScheduler(logger), } s.SetLogger(logger) @@ -41,6 +43,13 @@ type ackCallbackScheduler struct { // Meanwhile the timetick order of any vchannel of those two tasks are same with the order of broadcastID, // so the smaller broadcastID task is always acked before the larger broadcastID task. // so we can exeucte the tasks by the order of the broadcastID to promise the ack order is same with wal order. + rkLockerMu sync.Mutex // because batch lock operation will be executed on rkLocker, + // so we may encounter following cases: + // 1. task A, B, C are competing with rkLocker, and we want the operation is executed in order of A -> B -> C. + // 2. A is on running, and B, C are waiting for the lock. + // 3. When triggerAckCallback, B is failed to acquire the lock, C is pending to call FastLock. + // 4. Then A is done, the lock is released, C acquires the lock and executes the ack callback, the order is broken as A -> C -> B. + // To avoid the order broken, we need to use a mutex to protect the batch lock operation. rkLocker *resourceKeyLocker // it is used to lock the resource-key of ack operation. // it is not same instance with the resourceKeyLocker in the broadcastTaskManager. // because it is just used to check if the resource-key is locked when acked. @@ -116,6 +125,9 @@ func (s *ackCallbackScheduler) addBroadcastTask(task *broadcastTask) error { // triggerAckCallback triggers the ack callback. func (s *ackCallbackScheduler) triggerAckCallback() { + s.rkLockerMu.Lock() + defer s.rkLockerMu.Unlock() + pendingTasks := make([]*broadcastTask, 0, len(s.pendingAckedTasks)) for _, task := range s.pendingAckedTasks { g, err := s.rkLocker.FastLock(task.Header().ResourceKeys.Collect()...) @@ -134,7 +146,10 @@ func (s *ackCallbackScheduler) triggerAckCallback() { func (s *ackCallbackScheduler) doAckCallback(bt *broadcastTask, g *lockGuards) (err error) { logger := s.Logger().With(zap.Uint64("broadcastID", bt.Header().BroadcastID)) defer func() { + s.rkLockerMu.Lock() g.Unlock() + s.rkLockerMu.Unlock() + s.triggerChan <- struct{}{} if err == nil { logger.Info("execute ack callback done") diff --git a/internal/streamingcoord/server/broadcaster/broadcast_manager.go b/internal/streamingcoord/server/broadcaster/broadcast_manager.go index 6545930b8e..fc5745ae5b 100644 --- a/internal/streamingcoord/server/broadcaster/broadcast_manager.go +++ b/internal/streamingcoord/server/broadcaster/broadcast_manager.go @@ -35,7 +35,7 @@ func RecoverBroadcaster(ctx context.Context) (Broadcaster, error) { func newBroadcastTaskManager(protos []*streamingpb.BroadcastTask) *broadcastTaskManager { logger := resource.Resource().Logger().With(log.FieldComponent("broadcaster")) metrics := newBroadcasterMetrics() - rkLocker := newResourceKeyLocker(metrics) + rkLocker := newResourceKeyLocker() ackScheduler := newAckCallbackScheduler(logger) recoveryTasks := make([]*broadcastTask, 0, len(protos)) diff --git a/internal/streamingcoord/server/broadcaster/broadcast_task.go b/internal/streamingcoord/server/broadcaster/broadcast_task.go index c1836adec3..44a374a587 100644 --- a/internal/streamingcoord/server/broadcaster/broadcast_task.go +++ b/internal/streamingcoord/server/broadcaster/broadcast_task.go @@ -29,9 +29,10 @@ func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask, metrics *broadc ackCallbackScheduler: ackCallbackScheduler, done: make(chan struct{}), allAcked: make(chan struct{}), + allAckedClosed: false, } if isAllDone(bt.task) { - close(bt.allAcked) + bt.closeAllAcked() } if proto.State == streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_TOMBSTONE { close(bt.done) @@ -58,6 +59,7 @@ func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage, m ackCallbackScheduler: ackCallbackScheduler, done: make(chan struct{}), allAcked: make(chan struct{}), + allAckedClosed: false, } return bt } @@ -83,6 +85,7 @@ type broadcastTask struct { dirty bool // a flag to indicate that the task has been modified and needs to be saved into the recovery info. done chan struct{} allAcked chan struct{} + allAckedClosed bool guards *lockGuards ackCallbackScheduler *ackCallbackScheduler joinAckCallbackScheduled bool // a flag to indicate that the join ack callback is scheduled. @@ -248,11 +251,20 @@ func (b *broadcastTask) ack(ctx context.Context, msgs ...message.ImmutableMessag b.joinAckCallbackScheduled = true } if allDone { - close(b.allAcked) + b.closeAllAcked() } return nil } +// closeAllAcked closes the allAcked channel. +func (b *broadcastTask) closeAllAcked() { + if b.allAckedClosed { + return + } + close(b.allAcked) + b.allAckedClosed = true +} + // hasControlChannel checks if the control channel is broadcasted. // for the operation since 2.6.5, the control channel is always broadcasted. // so it's just a dummy function for compatibility. diff --git a/internal/streamingcoord/server/broadcaster/resource_key_locker.go b/internal/streamingcoord/server/broadcaster/resource_key_locker.go index 5b40450621..31ab556117 100644 --- a/internal/streamingcoord/server/broadcaster/resource_key_locker.go +++ b/internal/streamingcoord/server/broadcaster/resource_key_locker.go @@ -15,7 +15,7 @@ import ( var errFastLockFailed = errors.New("fast lock failed") // newResourceKeyLocker creates a new resource key locker. -func newResourceKeyLocker(metrics *broadcasterMetrics) *resourceKeyLocker { +func newResourceKeyLocker() *resourceKeyLocker { return &resourceKeyLocker{ inner: lock.NewKeyLock[resourceLockKey](), } diff --git a/internal/streamingcoord/server/broadcaster/resource_key_locker_test.go b/internal/streamingcoord/server/broadcaster/resource_key_locker_test.go index a1f2f8188d..0ec8de6f07 100644 --- a/internal/streamingcoord/server/broadcaster/resource_key_locker_test.go +++ b/internal/streamingcoord/server/broadcaster/resource_key_locker_test.go @@ -13,7 +13,7 @@ import ( func TestResourceKeyLocker(t *testing.T) { t.Run("concurrent lock/unlock", func(t *testing.T) { - locker := newResourceKeyLocker(newBroadcasterMetrics()) + locker := newResourceKeyLocker() const numGoroutines = 10 const numKeys = 5 const numIterations = 100 @@ -70,7 +70,7 @@ func TestResourceKeyLocker(t *testing.T) { }) t.Run("deadlock prevention", func(t *testing.T) { - locker := newResourceKeyLocker(newBroadcasterMetrics()) + locker := newResourceKeyLocker() key1 := message.NewCollectionNameResourceKey("test_collection_1") key2 := message.NewCollectionNameResourceKey("test_collection_2") @@ -108,7 +108,7 @@ func TestResourceKeyLocker(t *testing.T) { }) t.Run("fast lock", func(t *testing.T) { - locker := newResourceKeyLocker(newBroadcasterMetrics()) + locker := newResourceKeyLocker() key := message.NewCollectionNameResourceKey("test_collection") // First fast lock should succeed diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c2989e5539..7e7a7bcdfe 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -6378,10 +6378,10 @@ too few tombstones may lead to ABA issues in the state of milvus cluster.`, p.WALBroadcasterTombstoneMaxCount = ParamItem{ Key: "streaming.walBroadcaster.tombstone.maxCount", Version: "2.6.0", - Doc: `The max count of tombstone, 256 by default. + Doc: `The max count of tombstone, 8192 by default. Tombstone is used to reject duplicate submissions of DDL messages, too few tombstones may lead to ABA issues in the state of milvus cluster.`, - DefaultValue: "256", + DefaultValue: "8192", Export: false, } p.WALBroadcasterTombstoneMaxCount.Init(base.mgr) @@ -6389,10 +6389,10 @@ too few tombstones may lead to ABA issues in the state of milvus cluster.`, p.WALBroadcasterTombstoneMaxLifetime = ParamItem{ Key: "streaming.walBroadcaster.tombstone.maxLifetime", Version: "2.6.0", - Doc: `The max lifetime of tombstone, 30m by default. + Doc: `The max lifetime of tombstone, 24h by default. Tombstone is used to reject duplicate submissions of DDL messages, too few tombstones may lead to ABA issues in the state of milvus cluster.`, - DefaultValue: "30m", + DefaultValue: "24h", Export: false, } p.WALBroadcasterTombstoneMaxLifetime.Init(base.mgr) diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 50de206b53..1fca1a5eda 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -669,8 +669,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 30*time.Second, params.StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse()) assert.Equal(t, 4.0, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat()) assert.Equal(t, 5*time.Minute, params.StreamingCfg.WALBroadcasterTombstoneCheckInternal.GetAsDurationByParse()) - assert.Equal(t, 256, params.StreamingCfg.WALBroadcasterTombstoneMaxCount.GetAsInt()) - assert.Equal(t, 30*time.Minute, params.StreamingCfg.WALBroadcasterTombstoneMaxLifetime.GetAsDurationByParse()) + assert.Equal(t, 8192, params.StreamingCfg.WALBroadcasterTombstoneMaxCount.GetAsInt()) + assert.Equal(t, 24*time.Hour, params.StreamingCfg.WALBroadcasterTombstoneMaxLifetime.GetAsDurationByParse()) assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse()) assert.Equal(t, 30*time.Second, params.StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse()) assert.Equal(t, int64(64*1024*1024), params.StreamingCfg.WALWriteAheadBufferCapacity.GetAsSize())