Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaProducer: Expose run() method #63

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ let rdkafkaExclude = [
let package = Package(
name: "swift-kafka-gsoc",
platforms: [
.macOS(.v10_15),
.iOS(.v13),
.watchOS(.v6),
.tvOS(.v13),
.macOS(.v13),
.iOS(.v16),
.watchOS(.v9),
.tvOS(.v16),
],
products: [
.library(
Expand Down
35 changes: 23 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,35 @@ The `sendAsync(_:)` method of `KafkaProducer` returns a message-id that can late
```swift
let config = KafkaProducerConfig(bootstrapServers: ["localhost:9092"])

let producer = try await KafkaProducer(
let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements(
config: config,
logger: .kafkaTest // Your logger here
)

let messageID = try await producer.sendAsync(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
)
)
await withThrowingTaskGroup(of: Void.self) { group in

for await acknowledgement in producer.acknowledgements {
// Check if acknowledgement belongs to the sent message
}
// Run Task
group.addTask {
try await producer.run()
}

// Required
await producer.shutdownGracefully()
// Task receiving acknowledgements
group.addTask {
let messageID = try await producer.sendAsync(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
)
)

for await acknowledgement in acknowledgements {
// Check if acknowledgement belongs to the sent message
}

// Required
await producer.shutdownGracefully()
}
}
```

### Consumer API
Expand Down
14 changes: 14 additions & 0 deletions Sources/SwiftKafka/KafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ final class KafkaClient {
rd_kafka_destroy(kafkaHandle)
}

/// Polls the Kafka client for events.
///
/// Events will cause application-provided callbacks to be called.
///
/// - Parameter timeout: Specifies the maximum amount of time
/// (in milliseconds) that the call will block waiting for events.
/// For non-blocking calls, provide 0 as `timeout`.
/// To wait indefinitely for an event, provide -1.
/// - Returns: The number of events served.
@discardableResult
func poll(timeout: Int32) -> Int32 {
return rd_kafka_poll(self.kafkaHandle, timeout)
}

/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter body: The closure will use the Kafka handle pointer.
Expand Down
7 changes: 7 additions & 0 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ public final class KafkaConsumer {
highWatermark: highWatermark
)

// (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt)
// This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer.
// The source MUST be held by the caller and used to signal new elements or finish.
// The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller.
// This is due to the fact that deiniting the sequence is used as part of a trigger to
// terminate the underlying source.
// TODO: make self delegate to avoid weak reference here
let messagesSequenceDelegate = ConsumerMessagesAsyncSequenceDelegate { [weak self] in
self?.produceMore()
} didTerminateClosure: { [weak self] in
Expand Down
177 changes: 99 additions & 78 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,15 @@ import Crdkafka
import Logging
import NIOCore

/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy {
func didYield(bufferDepth: Int) -> Bool { true }
func didConsume(bufferDepth: Int) -> Bool { true }
}

/// `NIOAsyncSequenceProducerDelegate` that does nothing.
struct NoDelegate: NIOAsyncSequenceProducerDelegate {
func produceMore() {}
func didTerminate() {}
}

/// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
public struct AcknowledgedMessagesAsyncSequence: AsyncSequence {
public struct KafkaMessageAcknowledgements: AsyncSequence {
public typealias Element = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, NoBackPressure, NoDelegate>
typealias WrappedSequence = AsyncStream<Element>
let wrappedSequence: WrappedSequence

/// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
public struct AcknowledgedMessagesAsyncIterator: AsyncIteratorProtocol {
let wrappedIterator: NIOAsyncSequenceProducer<Element, NoBackPressure, NoDelegate>.AsyncIterator
var wrappedIterator: AsyncStream<Element>.AsyncIterator

public mutating func next() async -> Element? {
await self.wrappedIterator.next()
Expand Down Expand Up @@ -77,65 +65,110 @@ public actor KafkaProducer {
/// Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer.
private var topicHandles: [String: OpaquePointer]

// We use implicitly unwrapped optionals here as these properties need to access self upon initialization
/// Used for handling the connection to the Kafka cluster.
private var client: KafkaClient!
/// Task that polls the Kafka cluster for updates periodically.
private var pollTask: Task<Void, Never>!

/// `AsyncSequence` that returns all ``KafkaProducerMessage`` objects that have been
/// acknowledged by the Kafka cluster.
public nonisolated let acknowledgements: AcknowledgedMessagesAsyncSequence
nonisolated let acknowlegdementsSource: AcknowledgedMessagesAsyncSequence.WrappedSequence.Source
private typealias Acknowledgement = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
private let client: KafkaClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a ticket to try to internalise all of the properties here into our state where it makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#64


// Private initializer, use factory methods to create KafkaProducer
/// Initialize a new ``KafkaProducer``.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
public init(
config: KafkaProducerConfig = KafkaProducerConfig(),
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
/// - Throws: A ``KafkaError`` if initializing the producer failed.
private init(
client: KafkaClient,
topicConfig: KafkaTopicConfig,
logger: Logger
) async throws {
self.client = client
self.topicConfig = topicConfig
self.logger = logger
self.topicHandles = [:]
self.logger = logger
self.state = .started
}

// (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt)
// This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer.
// The source MUST be held by the caller and used to signal new elements or finish.
// The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller.
// This is due to the fact that deiniting the sequence is used as part of a trigger to
// terminate the underlying source.
let acknowledgementsSourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
elementType: Acknowledgement.self,
backPressureStrategy: NoBackPressure(),
delegate: NoDelegate()
/// Initialize a new ``KafkaProducer``.
///
/// This factory method creates a producer without message acknowledgements.
///
/// - Parameter configuration: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfiguration: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: The newly created ``KafkaProducer``.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
public static func makeProducer(
config: KafkaProducerConfig = KafkaProducerConfig(),
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
logger: Logger
) async throws -> KafkaProducer {
let client = try RDKafka.createClient(
type: .producer,
configDictionary: config.dictionary,
// Having no callback will discard any incoming acknowledgement messages
// Ref: rdkafka_broker.c:rd_kafka_dr_msgq
callback: nil,
logger: logger
)
self.acknowlegdementsSource = acknowledgementsSourceAndSequence.source
self.acknowledgements = AcknowledgedMessagesAsyncSequence(
wrappedSequence: acknowledgementsSourceAndSequence.sequence

let producer = try await KafkaProducer(
client: client,
topicConfig: topicConfig,
logger: logger
)

self.client = try RDKafka.createClient(
return producer
}

/// Initialize a new ``KafkaProducer`` and a ``KafkaMessageAcknowledgements`` asynchronous sequence.
///
/// Use the asynchronous sequence to consume message acknowledgements.
///
/// - Important: When the asynchronous sequence is deinited the producer will be shutdown.
///
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
/// `AsyncSequence` used for receiving message acknowledgements.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
public static func makeProducerWithAcknowledgements(
config: KafkaProducerConfig = KafkaProducerConfig(),
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
logger: Logger
) async throws -> (KafkaProducer, KafkaMessageAcknowledgements) {
var streamContinuation: AsyncStream<Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>>.Continuation?
let stream = AsyncStream { continuation in
streamContinuation = continuation
}

let client = try RDKafka.createClient(
type: .producer,
configDictionary: config.dictionary,
callback: self.deliveryReportCallback,
logger: self.logger
callback: { [logger, streamContinuation] messageResult in
guard let messageResult else {
logger.error("Could not resolve acknowledged message")
return
}

// Ignore YieldResult as we don't support back pressure in KafkaProducer
streamContinuation?.yield(messageResult)
},
logger: logger
)

// Poll Kafka every millisecond
self.pollTask = Task { [client] in
while !Task.isCancelled {
client?.withKafkaHandlePointer { handle in
rd_kafka_poll(handle, 0)
}
try? await Task.sleep(nanoseconds: 1_000_000)
let producer = try await KafkaProducer(
client: client,
topicConfig: topicConfig,
logger: logger
)

streamContinuation?.onTermination = { [producer] _ in
Task {
await producer.shutdownGracefully()
}
}

let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: stream)
return (producer, acknowlegementsSequence)
}

/// Method to shutdown the ``KafkaProducer``.
Expand All @@ -155,7 +188,7 @@ public actor KafkaProducer {

private func _shutDownGracefully(timeout: Int32) async {
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
// Wait 10 seconds for outstanding messages to be sent and callbacks to be called
// Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called
self.client.withKafkaHandlePointer { handle in
rd_kafka_flush(handle, timeout)
continuation.resume()
Expand All @@ -165,11 +198,22 @@ public actor KafkaProducer {
for (_, topicHandle) in self.topicHandles {
rd_kafka_topic_destroy(topicHandle)
}
self.pollTask.cancel()

self.state = .shutDown
}

/// Start polling Kafka for acknowledged messages.
///
/// - Parameter pollInterval: The desired time interval between two consecutive polls.
/// - Returns: An awaitable task representing the execution of the poll loop.
public func run(pollInterval: Duration = .milliseconds(100)) async throws {
// TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle)
while self.state == .started {
self.client.poll(timeout: 0)
try await Task.sleep(for: pollInterval)
}
}

/// Send messages to the Kafka cluster asynchronously, aka "fire and forget".
/// This function is non-blocking.
/// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
Expand Down Expand Up @@ -220,29 +264,6 @@ public actor KafkaProducer {
return self.messageIDCounter
}

// Closure that is executed when a message has been acknowledged by Kafka
private lazy var deliveryReportCallback: (UnsafePointer<rd_kafka_message_t>?) -> Void = { [logger, acknowlegdementsSource] messagePointer in
guard let messagePointer = messagePointer else {
logger.error("Could not resolve acknowledged message")
return
}

let messageID = UInt(bitPattern: messagePointer.pointee._private)

do {
let message = try KafkaAcknowledgedMessage(messagePointer: messagePointer, id: messageID)
_ = acknowlegdementsSource.yield(.success(message))
} catch {
guard let error = error as? KafkaAcknowledgedMessageError else {
fatalError("Caught error that is not of type \(KafkaAcknowledgedMessageError.self)")
}
_ = acknowlegdementsSource.yield(.failure(error))
}

// The messagePointer is automatically destroyed by librdkafka
// For safety reasons, we only use it inside of this closure
}

/// Check `topicHandles` for a handle matching the topic name and create a new handle if needed.
/// - Parameter topic: The name of the topic that is addressed.
private func createTopicHandleIfNeeded(topic: String) throws -> OpaquePointer? {
Expand Down
2 changes: 1 addition & 1 deletion Sources/SwiftKafka/RDKafka/RDKafka.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct RDKafka {
static func createClient(
type: ClientType,
configDictionary: [String: String],
callback: ((UnsafePointer<rd_kafka_message_t>?) -> Void)? = nil,
callback: ((RDKafkaConfig.KafkaAcknowledgementResult?) -> Void)? = nil,
logger: Logger
) throws -> KafkaClient {
let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER
Expand Down
Loading