\
In Apache SeaTunnel version 2.3.9, the Kafka connector implementation contained a potential memory leak risk. When users configured streaming jobs to read data from Kafka, even with a read rate limit (read_limit.rows_per_second) set, the system could still experience continuous memory growth until an OOM (Out Of Memory) occurred.
In real deployments, users observed the following phenomena:
read_limit.rows_per_second=1 was configured, memory usage soared from 200MB to 5GB within 5 minutesThrough code review, it was found that the root cause lay in the createReader method of the KafkaSource class, where elementsQueue was initialized as an unbounded queue:
elementsQueue = new LinkedBlockingQueue<>();
This implementation had two critical issues:
LinkedBlockingQueue without a specified capacity can theoretically grow indefinitely. When producer speed far exceeds consumer speed, memory continuously grows.read_limit.rows_per_second=1, this limit did not actually apply to Kafka data reading, causing data to accumulate in the memory queue.The community resolved this issue via PR #9041. The main improvements include:
LinkedBlockingQueue with a fixed-size ArrayBlockingQueuequeue.size configuration parameter, allowing users to adjust as neededDEFAULT_QUEUE_SIZE=1000 as the default queue capacityCore implementation changes:
public class KafkaSource { private static final String QUEUE_SIZE_KEY = "queue.size"; private static final int DEFAULT_QUEUE_SIZE = 1000; public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader( SourceReader.Context readerContext) { int queueSize = kafkaSourceConfig.getInt(QUEUE_SIZE_KEY, DEFAULT_QUEUE_SIZE); BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue = new ArrayBlockingQueue<>(queueSize); // ... } }
For users of the SeaTunnel Kafka connector, it is recommended to:
queue.size value according to business needs and data characteristicsread_limit.rows_per_second parameter applies to downstream processing, not Kafka consumptionThis fix not only resolved the memory overflow risk but also improved system stability and configurability. By introducing bounded queues and configurable parameters, users can better control system resource usage and avoid OOM caused by data backlog. It also reflects the virtuous cycle of open-source communities continuously improving product quality through user feedback.


