fix: Fix CDC OOM due to high buffer size (#44607)

Fix CDC OOM by:
1. free msg buffer manually.
2. limit max msg buffer size.
3. reduce scanner msg hander buffer size.

issue: https://github.com/milvus-io/milvus/issues/44123

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-10-09 19:31:58 +08:00 committed by GitHub
parent 174e1e4071
commit 4ae44f1aa9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 158 additions and 29 deletions

View File

@ -37,8 +37,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
const scannerHandlerChanSize = 64
// Replicator is the client that replicates the message to the channel in the target cluster. // Replicator is the client that replicates the message to the channel in the target cluster.
type Replicator interface { type Replicator interface {
// StartReplicate starts the replicate for the channel. // StartReplicate starts the replicate for the channel.
@ -112,7 +110,7 @@ func (r *channelReplicator) replicateLoop() error {
if err != nil { if err != nil {
return err return err
} }
ch := make(adaptor.ChanMessageHandler, scannerHandlerChanSize) ch := make(adaptor.ChanMessageHandler)
scanner := streaming.WAL().Read(r.ctx, streaming.ReadOption{ scanner := streaming.WAL().Read(r.ctx, streaming.ReadOption{
PChannel: r.replicateInfo.GetSourceChannelName(), PChannel: r.replicateInfo.GetSourceChannelName(),
DeliverPolicy: options.DeliverPolicyStartFrom(cp.MessageID), DeliverPolicy: options.DeliverPolicyStartFrom(cp.MessageID),

View File

@ -63,17 +63,27 @@ type msgQueue struct {
notFull *sync.Cond notFull *sync.Cond
buf []message.ImmutableMessage buf []message.ImmutableMessage
bufBytes int
readIdx int readIdx int
cap int cap int
maxSize int
closed bool closed bool
} }
type MsgQueueOptions struct {
Capacity int
MaxSize int
}
// NewMsgQueue creates a queue with a fixed capacity (>0). // NewMsgQueue creates a queue with a fixed capacity (>0).
func NewMsgQueue(capacity int) *msgQueue { func NewMsgQueue(options MsgQueueOptions) *msgQueue {
if capacity <= 0 { if options.Capacity <= 0 {
panic("capacity must be > 0") panic("capacity must be > 0")
} }
q := &msgQueue{cap: capacity} if options.MaxSize <= 0 {
panic("max size must be > 0")
}
q := &msgQueue{cap: options.Capacity, maxSize: options.MaxSize}
q.notEmpty = sync.NewCond(&q.mu) q.notEmpty = sync.NewCond(&q.mu)
q.notFull = sync.NewCond(&q.mu) q.notFull = sync.NewCond(&q.mu)
return q return q
@ -92,7 +102,7 @@ func (q *msgQueue) Enqueue(ctx context.Context, msg message.ImmutableMessage) er
q.mu.Lock() q.mu.Lock()
defer q.mu.Unlock() defer q.mu.Unlock()
for len(q.buf) >= q.cap { for len(q.buf) >= q.cap || q.bufBytes >= q.maxSize {
if ctx.Err() != nil { if ctx.Err() != nil {
return context.Canceled return context.Canceled
} }
@ -110,6 +120,7 @@ func (q *msgQueue) Enqueue(ctx context.Context, msg message.ImmutableMessage) er
// } // }
q.buf = append(q.buf, msg) q.buf = append(q.buf, msg)
q.bufBytes += msg.EstimateSize()
// New data is available for readers. // New data is available for readers.
q.notEmpty.Signal() q.notEmpty.Signal()
@ -176,6 +187,11 @@ func (q *msgQueue) CleanupConfirmedMessages(lastConfirmedTimeTick uint64) []mess
copy(cleanedMessages, q.buf[:cut]) copy(cleanedMessages, q.buf[:cut])
// Drop the prefix [0:cut) // Drop the prefix [0:cut)
// Release memory of [0:cut) by zeroing references before reslicing
for i := 0; i < cut; i++ {
q.bufBytes -= q.buf[i].EstimateSize()
q.buf[i] = nil
}
q.buf = q.buf[cut:] q.buf = q.buf[cut:]
// Adjust read cursor relative to the new slice // Adjust read cursor relative to the new slice
q.readIdx -= cut q.readIdx -= cut

View File

@ -29,7 +29,7 @@ import (
func TestMsgQueue_BasicOperations(t *testing.T) { func TestMsgQueue_BasicOperations(t *testing.T) {
// Test basic queue operations // Test basic queue operations
queue := NewMsgQueue(3) queue := NewMsgQueue(MsgQueueOptions{Capacity: 3, MaxSize: 1000})
assert.Equal(t, 3, queue.Cap()) assert.Equal(t, 3, queue.Cap())
assert.Equal(t, 0, queue.Len()) assert.Equal(t, 0, queue.Len())
@ -37,6 +37,7 @@ func TestMsgQueue_BasicOperations(t *testing.T) {
ctx := context.Background() ctx := context.Background()
msg1 := mock_message.NewMockImmutableMessage(t) msg1 := mock_message.NewMockImmutableMessage(t)
msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe()
msg1.EXPECT().EstimateSize().Return(100).Maybe()
err := queue.Enqueue(ctx, msg1) err := queue.Enqueue(ctx, msg1)
assert.NoError(t, err) assert.NoError(t, err)
@ -51,14 +52,16 @@ func TestMsgQueue_BasicOperations(t *testing.T) {
func TestMsgQueue_EnqueueBlocking(t *testing.T) { func TestMsgQueue_EnqueueBlocking(t *testing.T) {
// Test enqueue blocking when queue is full // Test enqueue blocking when queue is full
queue := NewMsgQueue(2) queue := NewMsgQueue(MsgQueueOptions{Capacity: 2, MaxSize: 1000})
ctx := context.Background() ctx := context.Background()
// Fill the queue // Fill the queue
msg1 := mock_message.NewMockImmutableMessage(t) msg1 := mock_message.NewMockImmutableMessage(t)
msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe()
msg1.EXPECT().EstimateSize().Return(100).Maybe()
msg2 := mock_message.NewMockImmutableMessage(t) msg2 := mock_message.NewMockImmutableMessage(t)
msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe()
msg2.EXPECT().EstimateSize().Return(100).Maybe()
err := queue.Enqueue(ctx, msg1) err := queue.Enqueue(ctx, msg1)
assert.NoError(t, err) assert.NoError(t, err)
@ -69,6 +72,7 @@ func TestMsgQueue_EnqueueBlocking(t *testing.T) {
// Try to enqueue when full - should block // Try to enqueue when full - should block
msg3 := mock_message.NewMockImmutableMessage(t) msg3 := mock_message.NewMockImmutableMessage(t)
msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe() msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe()
msg3.EXPECT().EstimateSize().Return(100).Maybe()
// Use a context with timeout to test blocking // Use a context with timeout to test blocking
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond) ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
@ -82,7 +86,7 @@ func TestMsgQueue_EnqueueBlocking(t *testing.T) {
func TestMsgQueue_DequeueBlocking(t *testing.T) { func TestMsgQueue_DequeueBlocking(t *testing.T) {
// Test dequeue blocking when queue is empty // Test dequeue blocking when queue is empty
queue := NewMsgQueue(2) queue := NewMsgQueue(MsgQueueOptions{Capacity: 2, MaxSize: 1000})
ctx := context.Background() ctx := context.Background()
// Try to dequeue from empty queue - should block // Try to dequeue from empty queue - should block
@ -97,14 +101,16 @@ func TestMsgQueue_DequeueBlocking(t *testing.T) {
func TestMsgQueue_SeekToHead(t *testing.T) { func TestMsgQueue_SeekToHead(t *testing.T) {
// Test seek to head functionality // Test seek to head functionality
queue := NewMsgQueue(3) queue := NewMsgQueue(MsgQueueOptions{Capacity: 3, MaxSize: 1000})
ctx := context.Background() ctx := context.Background()
// Add messages // Add messages
msg1 := mock_message.NewMockImmutableMessage(t) msg1 := mock_message.NewMockImmutableMessage(t)
msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe()
msg1.EXPECT().EstimateSize().Return(100).Maybe()
msg2 := mock_message.NewMockImmutableMessage(t) msg2 := mock_message.NewMockImmutableMessage(t)
msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe()
msg2.EXPECT().EstimateSize().Return(100).Maybe()
err := queue.Enqueue(ctx, msg1) err := queue.Enqueue(ctx, msg1)
assert.NoError(t, err) assert.NoError(t, err)
@ -127,16 +133,19 @@ func TestMsgQueue_SeekToHead(t *testing.T) {
func TestMsgQueue_CleanupConfirmedMessages(t *testing.T) { func TestMsgQueue_CleanupConfirmedMessages(t *testing.T) {
// Test cleanup functionality // Test cleanup functionality
queue := NewMsgQueue(5) queue := NewMsgQueue(MsgQueueOptions{Capacity: 5, MaxSize: 1000})
ctx := context.Background() ctx := context.Background()
// Add messages with different timeticks // Add messages with different timeticks
msg1 := mock_message.NewMockImmutableMessage(t) msg1 := mock_message.NewMockImmutableMessage(t)
msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe()
msg1.EXPECT().EstimateSize().Return(100).Maybe()
msg2 := mock_message.NewMockImmutableMessage(t) msg2 := mock_message.NewMockImmutableMessage(t)
msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe()
msg2.EXPECT().EstimateSize().Return(100).Maybe()
msg3 := mock_message.NewMockImmutableMessage(t) msg3 := mock_message.NewMockImmutableMessage(t)
msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe() msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe()
msg3.EXPECT().EstimateSize().Return(100).Maybe()
err := queue.Enqueue(ctx, msg1) err := queue.Enqueue(ctx, msg1)
assert.NoError(t, err) assert.NoError(t, err)
@ -162,16 +171,19 @@ func TestMsgQueue_CleanupConfirmedMessages(t *testing.T) {
func TestMsgQueue_CleanupWithReadCursor(t *testing.T) { func TestMsgQueue_CleanupWithReadCursor(t *testing.T) {
// Test cleanup when read cursor is advanced // Test cleanup when read cursor is advanced
queue := NewMsgQueue(5) queue := NewMsgQueue(MsgQueueOptions{Capacity: 5, MaxSize: 1000})
ctx := context.Background() ctx := context.Background()
// Add messages // Add messages
msg1 := mock_message.NewMockImmutableMessage(t) msg1 := mock_message.NewMockImmutableMessage(t)
msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe()
msg1.EXPECT().EstimateSize().Return(100).Maybe()
msg2 := mock_message.NewMockImmutableMessage(t) msg2 := mock_message.NewMockImmutableMessage(t)
msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe()
msg2.EXPECT().EstimateSize().Return(100).Maybe()
msg3 := mock_message.NewMockImmutableMessage(t) msg3 := mock_message.NewMockImmutableMessage(t)
msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe() msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe()
msg3.EXPECT().EstimateSize().Return(100).Maybe()
err := queue.Enqueue(ctx, msg1) err := queue.Enqueue(ctx, msg1)
assert.NoError(t, err) assert.NoError(t, err)
@ -184,30 +196,30 @@ func TestMsgQueue_CleanupWithReadCursor(t *testing.T) {
dequeuedMsg, err := queue.ReadNext(ctx) dequeuedMsg, err := queue.ReadNext(ctx)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, msg1, dequeuedMsg) assert.Equal(t, msg1, dequeuedMsg)
assert.Equal(t, 1, queue.readIdx)
// Cleanup messages with timetick <= 150 // Cleanup messages with timetick <= 150
cleanedMessages := queue.CleanupConfirmedMessages(150) cleanedMessages := queue.CleanupConfirmedMessages(150)
assert.Equal(t, 2, queue.Len()) // msg1 removed, msg2 and msg3 remain assert.Equal(t, 2, queue.Len()) // msg1 removed, msg2 and msg3 remain
assert.Equal(t, 0, queue.readIdx) // read cursor adjusted
assert.Equal(t, 1, len(cleanedMessages)) assert.Equal(t, 1, len(cleanedMessages))
assert.Equal(t, msg1, cleanedMessages[0]) assert.Equal(t, msg1, cleanedMessages[0])
} }
func TestMsgQueue_ContextCancellation(t *testing.T) { func TestMsgQueue_ContextCancellation(t *testing.T) {
// Test context cancellation // Test context cancellation
queue := NewMsgQueue(1) queue := NewMsgQueue(MsgQueueOptions{Capacity: 1, MaxSize: 1000})
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// Fill the queue // Fill the queue
msg1 := mock_message.NewMockImmutableMessage(t) msg1 := mock_message.NewMockImmutableMessage(t)
msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe() msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe()
msg1.EXPECT().EstimateSize().Return(100).Maybe()
err := queue.Enqueue(ctx, msg1) err := queue.Enqueue(ctx, msg1)
assert.NoError(t, err) assert.NoError(t, err)
// Try to enqueue when full // Try to enqueue when full
msg2 := mock_message.NewMockImmutableMessage(t) msg2 := mock_message.NewMockImmutableMessage(t)
msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe() msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe()
msg2.EXPECT().EstimateSize().Return(100).Maybe()
// Cancel context before enqueue // Cancel context before enqueue
cancel() cancel()
@ -219,22 +231,30 @@ func TestMsgQueue_ContextCancellation(t *testing.T) {
func TestMsgQueue_NewMsgQueueValidation(t *testing.T) { func TestMsgQueue_NewMsgQueueValidation(t *testing.T) {
// Test constructor validation // Test constructor validation
assert.Panics(t, func() { assert.Panics(t, func() {
NewMsgQueue(0) NewMsgQueue(MsgQueueOptions{Capacity: 0, MaxSize: 1000})
}) })
assert.Panics(t, func() { assert.Panics(t, func() {
NewMsgQueue(-1) NewMsgQueue(MsgQueueOptions{Capacity: -1, MaxSize: 1000})
})
assert.Panics(t, func() {
NewMsgQueue(MsgQueueOptions{Capacity: 1, MaxSize: 0})
})
assert.Panics(t, func() {
NewMsgQueue(MsgQueueOptions{Capacity: 1, MaxSize: -1})
}) })
// Valid capacity // Valid capacity
queue := NewMsgQueue(1) queue := NewMsgQueue(MsgQueueOptions{Capacity: 1, MaxSize: 1000})
assert.NotNil(t, queue) assert.NotNil(t, queue)
assert.Equal(t, 1, queue.Cap()) assert.Equal(t, 1, queue.Cap())
} }
func TestMsgQueue_ConcurrentOperations(t *testing.T) { func TestMsgQueue_ConcurrentOperations(t *testing.T) {
// Test concurrent enqueue and dequeue operations // Test concurrent enqueue and dequeue operations
queue := NewMsgQueue(10) queue := NewMsgQueue(MsgQueueOptions{Capacity: 10, MaxSize: 10000})
ctx := context.Background() ctx := context.Background()
numMessages := 100 numMessages := 100
@ -246,6 +266,7 @@ func TestMsgQueue_ConcurrentOperations(t *testing.T) {
for i := 0; i < numMessages; i++ { for i := 0; i < numMessages; i++ {
msg := mock_message.NewMockImmutableMessage(t) msg := mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(uint64(i)).Maybe() msg.EXPECT().TimeTick().Return(uint64(i)).Maybe()
msg.EXPECT().EstimateSize().Return(100).Maybe()
err := queue.Enqueue(ctx, msg) err := queue.Enqueue(ctx, msg)
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -270,7 +291,7 @@ func TestMsgQueue_ConcurrentOperations(t *testing.T) {
func TestMsgQueue_EdgeCases(t *testing.T) { func TestMsgQueue_EdgeCases(t *testing.T) {
// Test edge cases // Test edge cases
queue := NewMsgQueue(1) queue := NewMsgQueue(MsgQueueOptions{Capacity: 1, MaxSize: 1000})
// Test with nil message (if allowed by interface) // Test with nil message (if allowed by interface)
// This depends on the actual message.ImmutableMessage interface implementation // This depends on the actual message.ImmutableMessage interface implementation
@ -283,5 +304,90 @@ func TestMsgQueue_EdgeCases(t *testing.T) {
// Test seek to head on empty queue // Test seek to head on empty queue
queue.SeekToHead() queue.SeekToHead()
assert.Equal(t, 0, queue.readIdx) // Note: readIdx is not accessible from outside the package
}
func TestMsgQueue_SizeBasedCapacity(t *testing.T) {
// Test size-based capacity control
queue := NewMsgQueue(MsgQueueOptions{Capacity: 10, MaxSize: 100})
ctx := context.Background()
// Add messages that exceed size limit but not count limit
msg1 := mock_message.NewMockImmutableMessage(t)
msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe()
msg1.EXPECT().EstimateSize().Return(100).Maybe()
msg2 := mock_message.NewMockImmutableMessage(t)
msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe()
msg2.EXPECT().EstimateSize().Return(50).Maybe()
err := queue.Enqueue(ctx, msg1)
assert.NoError(t, err)
assert.Equal(t, 1, queue.Len())
// Try to enqueue second message - should block due to size limit (100 >= 100)
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
err = queue.Enqueue(ctxWithTimeout, msg2)
assert.Error(t, err)
assert.Equal(t, context.Canceled, err)
}
func TestMsgQueue_SizeBasedCapacityWithCleanup(t *testing.T) {
// Test size-based capacity with cleanup
queue := NewMsgQueue(MsgQueueOptions{Capacity: 10, MaxSize: 300})
ctx := context.Background()
// Add messages
msg1 := mock_message.NewMockImmutableMessage(t)
msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe()
msg1.EXPECT().EstimateSize().Return(200).Maybe()
msg2 := mock_message.NewMockImmutableMessage(t)
msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe()
msg2.EXPECT().EstimateSize().Return(200).Maybe()
err := queue.Enqueue(ctx, msg1)
assert.NoError(t, err)
// Cleanup first message to free space
cleanedMessages := queue.CleanupConfirmedMessages(100)
assert.Equal(t, 1, len(cleanedMessages))
assert.Equal(t, msg1, cleanedMessages[0])
// Now should be able to enqueue second message (200 < 300)
err = queue.Enqueue(ctx, msg2)
assert.NoError(t, err)
assert.Equal(t, 1, queue.Len())
}
func TestMsgQueue_MixedCapacityLimits(t *testing.T) {
// Test both count and size limits
queue := NewMsgQueue(MsgQueueOptions{Capacity: 2, MaxSize: 1000})
ctx := context.Background()
// Add messages that hit count limit first
msg1 := mock_message.NewMockImmutableMessage(t)
msg1.EXPECT().TimeTick().Return(uint64(100)).Maybe()
msg1.EXPECT().EstimateSize().Return(100).Maybe()
msg2 := mock_message.NewMockImmutableMessage(t)
msg2.EXPECT().TimeTick().Return(uint64(200)).Maybe()
msg2.EXPECT().EstimateSize().Return(100).Maybe()
err := queue.Enqueue(ctx, msg1)
assert.NoError(t, err)
err = queue.Enqueue(ctx, msg2)
assert.NoError(t, err)
assert.Equal(t, 2, queue.Len())
// Try to enqueue third message - should block due to count limit
msg3 := mock_message.NewMockImmutableMessage(t)
msg3.EXPECT().TimeTick().Return(uint64(300)).Maybe()
msg3.EXPECT().EstimateSize().Return(100).Maybe()
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
err = queue.Enqueue(ctxWithTimeout, msg3)
assert.Error(t, err)
assert.Equal(t, context.Canceled, err)
} }

View File

@ -35,7 +35,11 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/replicateutil" "github.com/milvus-io/milvus/pkg/v2/util/replicateutil"
) )
const pendingMessageQueueLength = 128 const (
// TODO: sheep, make these parameters configurable
pendingMessageQueueLength = 128
pendingMessageQueueMaxSize = 128 * 1024 * 1024
)
// replicateStreamClient is the implementation of ReplicateStreamClient. // replicateStreamClient is the implementation of ReplicateStreamClient.
type replicateStreamClient struct { type replicateStreamClient struct {
@ -56,10 +60,15 @@ func NewReplicateStreamClient(ctx context.Context, replicateInfo *streamingpb.Re
ctx1, cancel := context.WithCancel(ctx) ctx1, cancel := context.WithCancel(ctx)
ctx1 = contextutil.WithClusterID(ctx1, replicateInfo.GetTargetCluster().GetClusterId()) ctx1 = contextutil.WithClusterID(ctx1, replicateInfo.GetTargetCluster().GetClusterId())
options := MsgQueueOptions{
Capacity: pendingMessageQueueLength,
MaxSize: pendingMessageQueueMaxSize,
}
pendingMessages := NewMsgQueue(options)
rs := &replicateStreamClient{ rs := &replicateStreamClient{
clusterID: paramtable.Get().CommonCfg.ClusterPrefix.GetValue(), clusterID: paramtable.Get().CommonCfg.ClusterPrefix.GetValue(),
replicateInfo: replicateInfo, replicateInfo: replicateInfo,
pendingMessages: NewMsgQueue(pendingMessageQueueLength), pendingMessages: pendingMessages,
metrics: NewReplicateMetrics(replicateInfo), metrics: NewReplicateMetrics(replicateInfo),
ctx: ctx1, ctx: ctx1,
cancel: cancel, cancel: cancel,