From 84a67d537de97270231eb88a2d4a48649c6c4f45 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Thu, 15 Jun 2023 17:30:32 +0100 Subject: [PATCH] KafkaProducer: AsyncStream + onTerminate->shutDown Motivation: Given that we don't support backpressure for our `KafkaProducer`, we can replace the `NIOAsyncSequence` with a `AsyncStream` for the acknowledgements. Furthermore we want to shut down the producer once our stream has terminated. Modifications: * `KafkaProducer`: replace `NIOAsyncSequence` with `AsyncStream` * `KafkaProducerTests.testNoMemoryLeakAfterShutdown` make sure to kill stream otherwise we have a memory leak --- Sources/SwiftKafka/KafkaProducer.swift | 45 +++++++------------ .../SwiftKafkaTests/KafkaProducerTests.swift | 4 +- 2 files changed, 18 insertions(+), 31 deletions(-) diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index a85a2e39..0efc6c95 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -16,27 +16,15 @@ import Crdkafka import Logging import NIOCore -/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true. -struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy { - func didYield(bufferDepth: Int) -> Bool { true } - func didConsume(bufferDepth: Int) -> Bool { true } -} - -/// `NIOAsyncSequenceProducerDelegate` that does nothing. -struct NoDelegate: NIOAsyncSequenceProducerDelegate { - func produceMore() {} - func didTerminate() {} -} - /// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``). public struct KafkaMessageAcknowledgements: AsyncSequence { public typealias Element = Result - typealias WrappedSequence = NIOAsyncSequenceProducer + typealias WrappedSequence = AsyncStream let wrappedSequence: WrappedSequence /// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``). public struct AcknowledgedMessagesAsyncIterator: AsyncIteratorProtocol { - let wrappedIterator: NIOAsyncSequenceProducer.AsyncIterator + var wrappedIterator: AsyncStream.AsyncIterator public mutating func next() async -> Element? { await self.wrappedIterator.next() @@ -98,7 +86,6 @@ public actor KafkaProducer { self.state = .started } - /// Initialize a new ``KafkaProducer``. /// /// This factory method creates a producer without message acknowledgements. @@ -148,30 +135,22 @@ public actor KafkaProducer { topicConfig: KafkaTopicConfig = KafkaTopicConfig(), logger: Logger ) async throws -> (KafkaProducer, KafkaMessageAcknowledgements) { - // (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt) - // This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer. - // The source MUST be held by the caller and used to signal new elements or finish. - // The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller. - // This is due to the fact that deiniting the sequence is used as part of a trigger to - // terminate the underlying source. - let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( - elementType: Result.self, - backPressureStrategy: NoBackPressure(), - delegate: NoDelegate() - ) - let source = sourceAndSequence.source + var streamContinuation: AsyncStream>.Continuation? + let stream = AsyncStream { continuation in + streamContinuation = continuation + } let client = try RDKafka.createClient( type: .producer, configDictionary: config.dictionary, - callback: { [logger, source] messageResult in + callback: { [logger, streamContinuation] messageResult in guard let messageResult else { logger.error("Could not resolve acknowledged message") return } // Ignore YieldResult as we don't support back pressure in KafkaProducer - _ = source.yield(messageResult) + streamContinuation?.yield(messageResult) }, logger: logger ) @@ -182,7 +161,13 @@ public actor KafkaProducer { logger: logger ) - let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: sourceAndSequence.sequence) + streamContinuation?.onTermination = { [producer] _ in + Task { + await producer.shutdownGracefully() + } + } + + let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: stream) return (producer, acknowlegementsSequence) } diff --git a/Tests/SwiftKafkaTests/KafkaProducerTests.swift b/Tests/SwiftKafkaTests/KafkaProducerTests.swift index 6a3e37a1..507fc107 100644 --- a/Tests/SwiftKafkaTests/KafkaProducerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaProducerTests.swift @@ -218,7 +218,7 @@ final class KafkaProducerTests: XCTestCase { func testNoMemoryLeakAfterShutdown() async throws { var producer: KafkaProducer? - var acks: KafkaMessageAcknowledgements + var acks: KafkaMessageAcknowledgements? (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) _ = acks @@ -226,6 +226,8 @@ final class KafkaProducerTests: XCTestCase { await producer?.shutdownGracefully() producer = nil + // Make sure to terminate the AsyncSequence + acks = nil // Wait for rd_kafka_flush to complete try await Task.sleep(nanoseconds: 10 * 1_000_000_000)