diff --git a/internal/streamingcoord/server/broadcaster/task.go b/internal/streamingcoord/server/broadcaster/task.go index d1067fb4a8..b5bc4349b0 100644 --- a/internal/streamingcoord/server/broadcaster/task.go +++ b/internal/streamingcoord/server/broadcaster/task.go @@ -121,6 +121,7 @@ func (h *pendingBroadcastTaskArray) Pop() interface{} { old := *h n := len(old) x := old[n-1] + old[n-1] = nil // release the memory of underlying array. *h = old[0 : n-1] return x } diff --git a/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go b/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go index fdde3d44cf..9f9eea472b 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go +++ b/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go @@ -221,7 +221,7 @@ func (impl *flusherComponents) buildDataSyncServiceWithRetry(ctx context.Context logger.Warn("fail to append flush message for segments that not created by streaming service into wal", zap.Error(err)) return err } - impl.logger.Info("append flush message for segments that not created by streaming service into wal", zap.Stringer("msgID", appendResult.MessageID), zap.Uint64("timeTick", appendResult.TimeTick)) + logger.Info("append flush message for segments that not created by streaming service into wal", zap.Stringer("msgID", appendResult.MessageID), zap.Uint64("timeTick", appendResult.TimeTick)) return nil }, retry.AttemptAlways()); err != nil { return nil, err diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/ack.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack.go index 9ff5e044ab..765605c20f 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/ack.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack.go @@ -86,6 +86,7 @@ func (h *ackers) Pop() interface{} { old := *h n := len(old) x := old[n-1] + old[n-1] = nil // release the memory of underlying array. *h = old[0 : n-1] return x } diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/last_confirmed.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/last_confirmed.go index 83b25118c3..82c818e57b 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/last_confirmed.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/last_confirmed.go @@ -79,6 +79,7 @@ func (h *uncommittedTxnInfoOrderByMessageID) Pop() interface{} { old := *h n := len(old) x := old[n-1] + old[n-1] = nil // release the memory of underlying array. *h = old[0 : n-1] return x } diff --git a/internal/streamingnode/server/wal/utility/message_heap.go b/internal/streamingnode/server/wal/utility/message_heap.go index adc81a1f57..a7e7a9ef2f 100644 --- a/internal/streamingnode/server/wal/utility/message_heap.go +++ b/internal/streamingnode/server/wal/utility/message_heap.go @@ -35,6 +35,7 @@ func (h *immutableMessageHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] + old[n-1] = nil // release the memory of underlying array. *h = old[0 : n-1] return x } diff --git a/pkg/util/typeutil/heap.go b/pkg/util/typeutil/heap.go index 94a9ab0840..e571b5c281 100644 --- a/pkg/util/typeutil/heap.go +++ b/pkg/util/typeutil/heap.go @@ -60,6 +60,8 @@ func (h *heapArray[E]) Pop() interface{} { old := *h n := len(old) x := old[n-1] + var zero E + old[n-1] = zero // release the memory of underlying array. *h = old[0 : n-1] return x } @@ -94,6 +96,8 @@ func (h *objectHeapArray[O, E]) Pop() interface{} { old := h.objects n := len(old) x := old[n-1] + var zero O + old[n-1] = zero // release the memory of underlying array. h.objects = old[0 : n-1] return x }