Skip to content

Commit

Permalink
KafkaProducer: AsyncStream + onTerminate->shutDown
Browse files Browse the repository at this point in the history
Motivation:

Given that we don't support backpressure for our `KafkaProducer`, we can
replace the `NIOAsyncSequence` with a `AsyncStream` for the
acknowledgements.

Furthermore we want to shut down the producer once our stream has
terminated.

Modifications:

* `KafkaProducer`: replace `NIOAsyncSequence` with `AsyncStream`
* `KafkaProducerTests.testNoMemoryLeakAfterShutdown` make sure to kill
  stream otherwise we have a memory leak
  • Loading branch information
felixschlegel committed Jun 15, 2023
1 parent 3a6d545 commit 84a67d5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 31 deletions.
45 changes: 15 additions & 30 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,15 @@ import Crdkafka
import Logging
import NIOCore

/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy {
func didYield(bufferDepth: Int) -> Bool { true }
func didConsume(bufferDepth: Int) -> Bool { true }
}

/// `NIOAsyncSequenceProducerDelegate` that does nothing.
struct NoDelegate: NIOAsyncSequenceProducerDelegate {
func produceMore() {}
func didTerminate() {}
}

/// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
public struct KafkaMessageAcknowledgements: AsyncSequence {
public typealias Element = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, NoBackPressure, NoDelegate>
typealias WrappedSequence = AsyncStream<Element>
let wrappedSequence: WrappedSequence

/// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
public struct AcknowledgedMessagesAsyncIterator: AsyncIteratorProtocol {
let wrappedIterator: NIOAsyncSequenceProducer<Element, NoBackPressure, NoDelegate>.AsyncIterator
var wrappedIterator: AsyncStream<Element>.AsyncIterator

public mutating func next() async -> Element? {
await self.wrappedIterator.next()
Expand Down Expand Up @@ -98,7 +86,6 @@ public actor KafkaProducer {
self.state = .started
}


/// Initialize a new ``KafkaProducer``.
///
/// This factory method creates a producer without message acknowledgements.
Expand Down Expand Up @@ -148,30 +135,22 @@ public actor KafkaProducer {
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
logger: Logger
) async throws -> (KafkaProducer, KafkaMessageAcknowledgements) {
// (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt)
// This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer.
// The source MUST be held by the caller and used to signal new elements or finish.
// The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller.
// This is due to the fact that deiniting the sequence is used as part of a trigger to
// terminate the underlying source.
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
elementType: Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>.self,
backPressureStrategy: NoBackPressure(),
delegate: NoDelegate()
)
let source = sourceAndSequence.source
var streamContinuation: AsyncStream<Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>>.Continuation?
let stream = AsyncStream { continuation in
streamContinuation = continuation
}

let client = try RDKafka.createClient(
type: .producer,
configDictionary: config.dictionary,
callback: { [logger, source] messageResult in
callback: { [logger, streamContinuation] messageResult in
guard let messageResult else {
logger.error("Could not resolve acknowledged message")
return
}

// Ignore YieldResult as we don't support back pressure in KafkaProducer
_ = source.yield(messageResult)
streamContinuation?.yield(messageResult)
},
logger: logger
)
Expand All @@ -182,7 +161,13 @@ public actor KafkaProducer {
logger: logger
)

let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: sourceAndSequence.sequence)
streamContinuation?.onTermination = { [producer] _ in
Task {
await producer.shutdownGracefully()
}
}

let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: stream)
return (producer, acknowlegementsSequence)
}

Expand Down
4 changes: 3 additions & 1 deletion Tests/SwiftKafkaTests/KafkaProducerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,16 @@ final class KafkaProducerTests: XCTestCase {

func testNoMemoryLeakAfterShutdown() async throws {
var producer: KafkaProducer?
var acks: KafkaMessageAcknowledgements
var acks: KafkaMessageAcknowledgements?
(producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
_ = acks

weak var producerCopy = producer

await producer?.shutdownGracefully()
producer = nil
// Make sure to terminate the AsyncSequence
acks = nil

// Wait for rd_kafka_flush to complete
try await Task.sleep(nanoseconds: 10 * 1_000_000_000)
Expand Down

0 comments on commit 84a67d5

Please sign in to comment.