From 4ae44f1aa9f6754089f1bd5c6e640b4c6bcfa623 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 9 Oct 2025 19:31:58 +0800 Subject: [PATCH] fix: Fix CDC OOM due to high buffer size (#44607) Fix CDC OOM by: 1. free msg buffer manually. 2. limit max msg buffer size. 3. reduce scanner msg hander buffer size. issue: https://github.com/milvus-io/milvus/issues/44123 Signed-off-by: bigsheeper --- .../replicatemanager/channel_replicator.go | 4 +- .../replication/replicatestream/msg_queue.go | 32 +++- .../replicatestream/msg_queue_test.go | 138 ++++++++++++++++-- .../replicate_stream_client_impl.go | 13 +- 4 files changed, 158 insertions(+), 29 deletions(-) diff --git a/internal/cdc/replication/replicatemanager/channel_replicator.go b/internal/cdc/replication/replicatemanager/channel_replicator.go index 4a0387d799..36c06e1027 100644 --- a/internal/cdc/replication/replicatemanager/channel_replicator.go +++ b/internal/cdc/replication/replicatemanager/channel_replicator.go @@ -37,8 +37,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) -const scannerHandlerChanSize = 64 - // Replicator is the client that replicates the message to the channel in the target cluster. type Replicator interface { // StartReplicate starts the replicate for the channel. @@ -112,7 +110,7 @@ func (r *channelReplicator) replicateLoop() error { if err != nil { return err } - ch := make(adaptor.ChanMessageHandler, scannerHandlerChanSize) + ch := make(adaptor.ChanMessageHandler) scanner := streaming.WAL().Read(r.ctx, streaming.ReadOption{ PChannel: r.replicateInfo.GetSourceChannelName(), DeliverPolicy: options.DeliverPolicyStartFrom(cp.MessageID), diff --git a/internal/cdc/replication/replicatestream/msg_queue.go b/internal/cdc/replication/replicatestream/msg_queue.go index 786ba47412..9d99c77193 100644 --- a/internal/cdc/replication/replicatestream/msg_queue.go +++ b/internal/cdc/replication/replicatestream/msg_queue.go @@ -62,18 +62,28 @@ type msgQueue struct { notEmpty *sync.Cond notFull *sync.Cond - buf []message.ImmutableMessage - readIdx int - cap int - closed bool + buf []message.ImmutableMessage + bufBytes int + readIdx int + cap int + maxSize int + closed bool +} + +type MsgQueueOptions struct { + Capacity int + MaxSize int } // NewMsgQueue creates a queue with a fixed capacity (>0). -func NewMsgQueue(capacity int) *msgQueue { - if capacity <= 0 { +func NewMsgQueue(options MsgQueueOptions) *msgQueue { + if options.Capacity <= 0 { panic("capacity must be > 0") } - q := &msgQueue{cap: capacity} + if options.MaxSize <= 0 { + panic("max size must be > 0") + } + q := &msgQueue{cap: options.Capacity, maxSize: options.MaxSize} q.notEmpty = sync.NewCond(&q.mu) q.notFull = sync.NewCond(&q.mu) return q @@ -92,7 +102,7 @@ func (q *msgQueue) Enqueue(ctx context.Context, msg message.ImmutableMessage) er q.mu.Lock() defer q.mu.Unlock() - for len(q.buf) >= q.cap { + for len(q.buf) >= q.cap || q.bufBytes >= q.maxSize { if ctx.Err() != nil { return context.Canceled } @@ -110,6 +120,7 @@ func (q *msgQueue) Enqueue(ctx context.Context, msg message.ImmutableMessage) er // } q.buf = append(q.buf, msg) + q.bufBytes += msg.EstimateSize() // New data is available for readers. q.notEmpty.Signal() @@ -176,6 +187,11 @@ func (q *msgQueue) CleanupConfirmedMessages(lastConfirmedTimeTick uint64) []mess copy(cleanedMessages, q.buf[:cut]) // Drop the prefix [0:cut) + // Release memory of [0:cut) by zeroing references before reslicing + for i := 0; i < cut; i++ { + q.bufBytes -= q.buf[i].EstimateSize() + q.buf[i] = nil + } q.buf = q.buf[cut:] // Adjust read cursor relative to the new slice q.readIdx -= cut diff --git a/internal/cdc/replication/replicatestream/msg_queue_test.go b/internal/cdc/replication/replicatestream/msg_queue_test.go index 8c10251a97..d180e816f0 100644 --- a/internal/cdc/replication/replicatestream/msg_queue_test.go +++ b/internal/cdc/replication/replicatestream/msg_queue_test.go @@ -29,7 +29,7 @@ import ( func TestMsgQueue_BasicOperations(t *testing.T) { // Test basic queue operations - queue := NewMsgQueue(3) + queue := NewMsgQueue(MsgQueueOptions{Capacity: 3, MaxSize: 1000}) assert.Equal(t, 3, queue.Cap()) assert.Equal(t, 0, queue.Len()) @@ -37,6 +37,7 @@ func TestMsgQueue_BasicOperations(t *testing.T) { ctx := context.Background() msg1 := mock_message.NewMockImmutableMessage(t) msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() + msg1.EXPECT().EstimateSize().Return(100).Maybe() err := queue.Enqueue(ctx, msg1) assert.NoError(t, err) @@ -51,14 +52,16 @@ func TestMsgQueue_BasicOperations(t *testing.T) { func TestMsgQueue_EnqueueBlocking(t *testing.T) { // Test enqueue blocking when queue is full - queue := NewMsgQueue(2) + queue := NewMsgQueue(MsgQueueOptions{Capacity: 2, MaxSize: 1000}) ctx := context.Background() // Fill the queue msg1 := mock_message.NewMockImmutableMessage(t) msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() + msg1.EXPECT().EstimateSize().Return(100).Maybe() msg2 := mock_message.NewMockImmutableMessage(t) msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() + msg2.EXPECT().EstimateSize().Return(100).Maybe() err := queue.Enqueue(ctx, msg1) assert.NoError(t, err) @@ -69,6 +72,7 @@ func TestMsgQueue_EnqueueBlocking(t *testing.T) { // Try to enqueue when full - should block msg3 := mock_message.NewMockImmutableMessage(t) msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe() + msg3.EXPECT().EstimateSize().Return(100).Maybe() // Use a context with timeout to test blocking ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond) @@ -82,7 +86,7 @@ func TestMsgQueue_EnqueueBlocking(t *testing.T) { func TestMsgQueue_DequeueBlocking(t *testing.T) { // Test dequeue blocking when queue is empty - queue := NewMsgQueue(2) + queue := NewMsgQueue(MsgQueueOptions{Capacity: 2, MaxSize: 1000}) ctx := context.Background() // Try to dequeue from empty queue - should block @@ -97,14 +101,16 @@ func TestMsgQueue_DequeueBlocking(t *testing.T) { func TestMsgQueue_SeekToHead(t *testing.T) { // Test seek to head functionality - queue := NewMsgQueue(3) + queue := NewMsgQueue(MsgQueueOptions{Capacity: 3, MaxSize: 1000}) ctx := context.Background() // Add messages msg1 := mock_message.NewMockImmutableMessage(t) msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() + msg1.EXPECT().EstimateSize().Return(100).Maybe() msg2 := mock_message.NewMockImmutableMessage(t) msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() + msg2.EXPECT().EstimateSize().Return(100).Maybe() err := queue.Enqueue(ctx, msg1) assert.NoError(t, err) @@ -127,16 +133,19 @@ func TestMsgQueue_SeekToHead(t *testing.T) { func TestMsgQueue_CleanupConfirmedMessages(t *testing.T) { // Test cleanup functionality - queue := NewMsgQueue(5) + queue := NewMsgQueue(MsgQueueOptions{Capacity: 5, MaxSize: 1000}) ctx := context.Background() // Add messages with different timeticks msg1 := mock_message.NewMockImmutableMessage(t) msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() + msg1.EXPECT().EstimateSize().Return(100).Maybe() msg2 := mock_message.NewMockImmutableMessage(t) msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() + msg2.EXPECT().EstimateSize().Return(100).Maybe() msg3 := mock_message.NewMockImmutableMessage(t) msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe() + msg3.EXPECT().EstimateSize().Return(100).Maybe() err := queue.Enqueue(ctx, msg1) assert.NoError(t, err) @@ -162,16 +171,19 @@ func TestMsgQueue_CleanupConfirmedMessages(t *testing.T) { func TestMsgQueue_CleanupWithReadCursor(t *testing.T) { // Test cleanup when read cursor is advanced - queue := NewMsgQueue(5) + queue := NewMsgQueue(MsgQueueOptions{Capacity: 5, MaxSize: 1000}) ctx := context.Background() // Add messages msg1 := mock_message.NewMockImmutableMessage(t) msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() + msg1.EXPECT().EstimateSize().Return(100).Maybe() msg2 := mock_message.NewMockImmutableMessage(t) msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() + msg2.EXPECT().EstimateSize().Return(100).Maybe() msg3 := mock_message.NewMockImmutableMessage(t) msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe() + msg3.EXPECT().EstimateSize().Return(100).Maybe() err := queue.Enqueue(ctx, msg1) assert.NoError(t, err) @@ -184,30 +196,30 @@ func TestMsgQueue_CleanupWithReadCursor(t *testing.T) { dequeuedMsg, err := queue.ReadNext(ctx) assert.NoError(t, err) assert.Equal(t, msg1, dequeuedMsg) - assert.Equal(t, 1, queue.readIdx) // Cleanup messages with timetick <= 150 cleanedMessages := queue.CleanupConfirmedMessages(150) - assert.Equal(t, 2, queue.Len()) // msg1 removed, msg2 and msg3 remain - assert.Equal(t, 0, queue.readIdx) // read cursor adjusted + assert.Equal(t, 2, queue.Len()) // msg1 removed, msg2 and msg3 remain assert.Equal(t, 1, len(cleanedMessages)) assert.Equal(t, msg1, cleanedMessages[0]) } func TestMsgQueue_ContextCancellation(t *testing.T) { // Test context cancellation - queue := NewMsgQueue(1) + queue := NewMsgQueue(MsgQueueOptions{Capacity: 1, MaxSize: 1000}) ctx, cancel := context.WithCancel(context.Background()) // Fill the queue msg1 := mock_message.NewMockImmutableMessage(t) msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() + msg1.EXPECT().EstimateSize().Return(100).Maybe() err := queue.Enqueue(ctx, msg1) assert.NoError(t, err) // Try to enqueue when full msg2 := mock_message.NewMockImmutableMessage(t) msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() + msg2.EXPECT().EstimateSize().Return(100).Maybe() // Cancel context before enqueue cancel() @@ -219,22 +231,30 @@ func TestMsgQueue_ContextCancellation(t *testing.T) { func TestMsgQueue_NewMsgQueueValidation(t *testing.T) { // Test constructor validation assert.Panics(t, func() { - NewMsgQueue(0) + NewMsgQueue(MsgQueueOptions{Capacity: 0, MaxSize: 1000}) }) assert.Panics(t, func() { - NewMsgQueue(-1) + NewMsgQueue(MsgQueueOptions{Capacity: -1, MaxSize: 1000}) + }) + + assert.Panics(t, func() { + NewMsgQueue(MsgQueueOptions{Capacity: 1, MaxSize: 0}) + }) + + assert.Panics(t, func() { + NewMsgQueue(MsgQueueOptions{Capacity: 1, MaxSize: -1}) }) // Valid capacity - queue := NewMsgQueue(1) + queue := NewMsgQueue(MsgQueueOptions{Capacity: 1, MaxSize: 1000}) assert.NotNil(t, queue) assert.Equal(t, 1, queue.Cap()) } func TestMsgQueue_ConcurrentOperations(t *testing.T) { // Test concurrent enqueue and dequeue operations - queue := NewMsgQueue(10) + queue := NewMsgQueue(MsgQueueOptions{Capacity: 10, MaxSize: 10000}) ctx := context.Background() numMessages := 100 @@ -246,6 +266,7 @@ func TestMsgQueue_ConcurrentOperations(t *testing.T) { for i := 0; i < numMessages; i++ { msg := mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(uint64(i)).Maybe() + msg.EXPECT().EstimateSize().Return(100).Maybe() err := queue.Enqueue(ctx, msg) assert.NoError(t, err) } @@ -270,7 +291,7 @@ func TestMsgQueue_ConcurrentOperations(t *testing.T) { func TestMsgQueue_EdgeCases(t *testing.T) { // Test edge cases - queue := NewMsgQueue(1) + queue := NewMsgQueue(MsgQueueOptions{Capacity: 1, MaxSize: 1000}) // Test with nil message (if allowed by interface) // This depends on the actual message.ImmutableMessage interface implementation @@ -283,5 +304,90 @@ func TestMsgQueue_EdgeCases(t *testing.T) { // Test seek to head on empty queue queue.SeekToHead() - assert.Equal(t, 0, queue.readIdx) + // Note: readIdx is not accessible from outside the package +} + +func TestMsgQueue_SizeBasedCapacity(t *testing.T) { + // Test size-based capacity control + queue := NewMsgQueue(MsgQueueOptions{Capacity: 10, MaxSize: 100}) + ctx := context.Background() + + // Add messages that exceed size limit but not count limit + msg1 := mock_message.NewMockImmutableMessage(t) + msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() + msg1.EXPECT().EstimateSize().Return(100).Maybe() + msg2 := mock_message.NewMockImmutableMessage(t) + msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() + msg2.EXPECT().EstimateSize().Return(50).Maybe() + + err := queue.Enqueue(ctx, msg1) + assert.NoError(t, err) + assert.Equal(t, 1, queue.Len()) + + // Try to enqueue second message - should block due to size limit (100 >= 100) + ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + + err = queue.Enqueue(ctxWithTimeout, msg2) + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) +} + +func TestMsgQueue_SizeBasedCapacityWithCleanup(t *testing.T) { + // Test size-based capacity with cleanup + queue := NewMsgQueue(MsgQueueOptions{Capacity: 10, MaxSize: 300}) + ctx := context.Background() + + // Add messages + msg1 := mock_message.NewMockImmutableMessage(t) + msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() + msg1.EXPECT().EstimateSize().Return(200).Maybe() + msg2 := mock_message.NewMockImmutableMessage(t) + msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() + msg2.EXPECT().EstimateSize().Return(200).Maybe() + + err := queue.Enqueue(ctx, msg1) + assert.NoError(t, err) + + // Cleanup first message to free space + cleanedMessages := queue.CleanupConfirmedMessages(100) + assert.Equal(t, 1, len(cleanedMessages)) + assert.Equal(t, msg1, cleanedMessages[0]) + + // Now should be able to enqueue second message (200 < 300) + err = queue.Enqueue(ctx, msg2) + assert.NoError(t, err) + assert.Equal(t, 1, queue.Len()) +} + +func TestMsgQueue_MixedCapacityLimits(t *testing.T) { + // Test both count and size limits + queue := NewMsgQueue(MsgQueueOptions{Capacity: 2, MaxSize: 1000}) + ctx := context.Background() + + // Add messages that hit count limit first + msg1 := mock_message.NewMockImmutableMessage(t) + msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() + msg1.EXPECT().EstimateSize().Return(100).Maybe() + msg2 := mock_message.NewMockImmutableMessage(t) + msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() + msg2.EXPECT().EstimateSize().Return(100).Maybe() + + err := queue.Enqueue(ctx, msg1) + assert.NoError(t, err) + err = queue.Enqueue(ctx, msg2) + assert.NoError(t, err) + assert.Equal(t, 2, queue.Len()) + + // Try to enqueue third message - should block due to count limit + msg3 := mock_message.NewMockImmutableMessage(t) + msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe() + msg3.EXPECT().EstimateSize().Return(100).Maybe() + + ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + + err = queue.Enqueue(ctxWithTimeout, msg3) + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) } diff --git a/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go b/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go index e639155a3f..c97f7e8981 100644 --- a/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go +++ b/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go @@ -35,7 +35,11 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/replicateutil" ) -const pendingMessageQueueLength = 128 +const ( + // TODO: sheep, make these parameters configurable + pendingMessageQueueLength = 128 + pendingMessageQueueMaxSize = 128 * 1024 * 1024 +) // replicateStreamClient is the implementation of ReplicateStreamClient. type replicateStreamClient struct { @@ -56,10 +60,15 @@ func NewReplicateStreamClient(ctx context.Context, replicateInfo *streamingpb.Re ctx1, cancel := context.WithCancel(ctx) ctx1 = contextutil.WithClusterID(ctx1, replicateInfo.GetTargetCluster().GetClusterId()) + options := MsgQueueOptions{ + Capacity: pendingMessageQueueLength, + MaxSize: pendingMessageQueueMaxSize, + } + pendingMessages := NewMsgQueue(options) rs := &replicateStreamClient{ clusterID: paramtable.Get().CommonCfg.ClusterPrefix.GetValue(), replicateInfo: replicateInfo, - pendingMessages: NewMsgQueue(pendingMessageQueueLength), + pendingMessages: pendingMessages, metrics: NewReplicateMetrics(replicateInfo), ctx: ctx1, cancel: cancel,