fix: panic when double close channel of ack broadcast (#45661)

issue: #45635

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-11-19 14:25:05 +08:00 committed by GitHub
parent a3b8bcb198
commit c8073eb90b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 41 additions and 14 deletions

View File

@ -3,6 +3,7 @@ package broadcaster
import (
"context"
"sort"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
@ -20,7 +21,8 @@ func newAckCallbackScheduler(logger *log.MLogger) *ackCallbackScheduler {
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
pending: make(chan *broadcastTask, 16),
triggerChan: make(chan struct{}, 1),
rkLocker: newResourceKeyLocker(newBroadcasterMetrics()),
rkLockerMu: sync.Mutex{},
rkLocker: newResourceKeyLocker(),
tombstoneScheduler: newTombstoneScheduler(logger),
}
s.SetLogger(logger)
@ -41,6 +43,13 @@ type ackCallbackScheduler struct {
// Meanwhile the timetick order of any vchannel of those two tasks are same with the order of broadcastID,
// so the smaller broadcastID task is always acked before the larger broadcastID task.
// so we can exeucte the tasks by the order of the broadcastID to promise the ack order is same with wal order.
rkLockerMu sync.Mutex // because batch lock operation will be executed on rkLocker,
// so we may encounter following cases:
// 1. task A, B, C are competing with rkLocker, and we want the operation is executed in order of A -> B -> C.
// 2. A is on running, and B, C are waiting for the lock.
// 3. When triggerAckCallback, B is failed to acquire the lock, C is pending to call FastLock.
// 4. Then A is done, the lock is released, C acquires the lock and executes the ack callback, the order is broken as A -> C -> B.
// To avoid the order broken, we need to use a mutex to protect the batch lock operation.
rkLocker *resourceKeyLocker // it is used to lock the resource-key of ack operation.
// it is not same instance with the resourceKeyLocker in the broadcastTaskManager.
// because it is just used to check if the resource-key is locked when acked.
@ -116,6 +125,9 @@ func (s *ackCallbackScheduler) addBroadcastTask(task *broadcastTask) error {
// triggerAckCallback triggers the ack callback.
func (s *ackCallbackScheduler) triggerAckCallback() {
s.rkLockerMu.Lock()
defer s.rkLockerMu.Unlock()
pendingTasks := make([]*broadcastTask, 0, len(s.pendingAckedTasks))
for _, task := range s.pendingAckedTasks {
g, err := s.rkLocker.FastLock(task.Header().ResourceKeys.Collect()...)
@ -134,7 +146,10 @@ func (s *ackCallbackScheduler) triggerAckCallback() {
func (s *ackCallbackScheduler) doAckCallback(bt *broadcastTask, g *lockGuards) (err error) {
logger := s.Logger().With(zap.Uint64("broadcastID", bt.Header().BroadcastID))
defer func() {
s.rkLockerMu.Lock()
g.Unlock()
s.rkLockerMu.Unlock()
s.triggerChan <- struct{}{}
if err == nil {
logger.Info("execute ack callback done")

View File

@ -35,7 +35,7 @@ func RecoverBroadcaster(ctx context.Context) (Broadcaster, error) {
func newBroadcastTaskManager(protos []*streamingpb.BroadcastTask) *broadcastTaskManager {
logger := resource.Resource().Logger().With(log.FieldComponent("broadcaster"))
metrics := newBroadcasterMetrics()
rkLocker := newResourceKeyLocker(metrics)
rkLocker := newResourceKeyLocker()
ackScheduler := newAckCallbackScheduler(logger)
recoveryTasks := make([]*broadcastTask, 0, len(protos))

View File

@ -29,9 +29,10 @@ func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask, metrics *broadc
ackCallbackScheduler: ackCallbackScheduler,
done: make(chan struct{}),
allAcked: make(chan struct{}),
allAckedClosed: false,
}
if isAllDone(bt.task) {
close(bt.allAcked)
bt.closeAllAcked()
}
if proto.State == streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_TOMBSTONE {
close(bt.done)
@ -58,6 +59,7 @@ func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage, m
ackCallbackScheduler: ackCallbackScheduler,
done: make(chan struct{}),
allAcked: make(chan struct{}),
allAckedClosed: false,
}
return bt
}
@ -83,6 +85,7 @@ type broadcastTask struct {
dirty bool // a flag to indicate that the task has been modified and needs to be saved into the recovery info.
done chan struct{}
allAcked chan struct{}
allAckedClosed bool
guards *lockGuards
ackCallbackScheduler *ackCallbackScheduler
joinAckCallbackScheduled bool // a flag to indicate that the join ack callback is scheduled.
@ -248,11 +251,20 @@ func (b *broadcastTask) ack(ctx context.Context, msgs ...message.ImmutableMessag
b.joinAckCallbackScheduled = true
}
if allDone {
close(b.allAcked)
b.closeAllAcked()
}
return nil
}
// closeAllAcked closes the allAcked channel.
func (b *broadcastTask) closeAllAcked() {
if b.allAckedClosed {
return
}
close(b.allAcked)
b.allAckedClosed = true
}
// hasControlChannel checks if the control channel is broadcasted.
// for the operation since 2.6.5, the control channel is always broadcasted.
// so it's just a dummy function for compatibility.

View File

@ -15,7 +15,7 @@ import (
var errFastLockFailed = errors.New("fast lock failed")
// newResourceKeyLocker creates a new resource key locker.
func newResourceKeyLocker(metrics *broadcasterMetrics) *resourceKeyLocker {
func newResourceKeyLocker() *resourceKeyLocker {
return &resourceKeyLocker{
inner: lock.NewKeyLock[resourceLockKey](),
}

View File

@ -13,7 +13,7 @@ import (
func TestResourceKeyLocker(t *testing.T) {
t.Run("concurrent lock/unlock", func(t *testing.T) {
locker := newResourceKeyLocker(newBroadcasterMetrics())
locker := newResourceKeyLocker()
const numGoroutines = 10
const numKeys = 5
const numIterations = 100
@ -70,7 +70,7 @@ func TestResourceKeyLocker(t *testing.T) {
})
t.Run("deadlock prevention", func(t *testing.T) {
locker := newResourceKeyLocker(newBroadcasterMetrics())
locker := newResourceKeyLocker()
key1 := message.NewCollectionNameResourceKey("test_collection_1")
key2 := message.NewCollectionNameResourceKey("test_collection_2")
@ -108,7 +108,7 @@ func TestResourceKeyLocker(t *testing.T) {
})
t.Run("fast lock", func(t *testing.T) {
locker := newResourceKeyLocker(newBroadcasterMetrics())
locker := newResourceKeyLocker()
key := message.NewCollectionNameResourceKey("test_collection")
// First fast lock should succeed

View File

@ -6378,10 +6378,10 @@ too few tombstones may lead to ABA issues in the state of milvus cluster.`,
p.WALBroadcasterTombstoneMaxCount = ParamItem{
Key: "streaming.walBroadcaster.tombstone.maxCount",
Version: "2.6.0",
Doc: `The max count of tombstone, 256 by default.
Doc: `The max count of tombstone, 8192 by default.
Tombstone is used to reject duplicate submissions of DDL messages,
too few tombstones may lead to ABA issues in the state of milvus cluster.`,
DefaultValue: "256",
DefaultValue: "8192",
Export: false,
}
p.WALBroadcasterTombstoneMaxCount.Init(base.mgr)
@ -6389,10 +6389,10 @@ too few tombstones may lead to ABA issues in the state of milvus cluster.`,
p.WALBroadcasterTombstoneMaxLifetime = ParamItem{
Key: "streaming.walBroadcaster.tombstone.maxLifetime",
Version: "2.6.0",
Doc: `The max lifetime of tombstone, 30m by default.
Doc: `The max lifetime of tombstone, 24h by default.
Tombstone is used to reject duplicate submissions of DDL messages,
too few tombstones may lead to ABA issues in the state of milvus cluster.`,
DefaultValue: "30m",
DefaultValue: "24h",
Export: false,
}
p.WALBroadcasterTombstoneMaxLifetime.Init(base.mgr)

View File

@ -669,8 +669,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 30*time.Second, params.StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse())
assert.Equal(t, 4.0, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat())
assert.Equal(t, 5*time.Minute, params.StreamingCfg.WALBroadcasterTombstoneCheckInternal.GetAsDurationByParse())
assert.Equal(t, 256, params.StreamingCfg.WALBroadcasterTombstoneMaxCount.GetAsInt())
assert.Equal(t, 30*time.Minute, params.StreamingCfg.WALBroadcasterTombstoneMaxLifetime.GetAsDurationByParse())
assert.Equal(t, 8192, params.StreamingCfg.WALBroadcasterTombstoneMaxCount.GetAsInt())
assert.Equal(t, 24*time.Hour, params.StreamingCfg.WALBroadcasterTombstoneMaxLifetime.GetAsDurationByParse())
assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
assert.Equal(t, 30*time.Second, params.StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse())
assert.Equal(t, int64(64*1024*1024), params.StreamingCfg.WALWriteAheadBufferCapacity.GetAsSize())