Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaProducer: Expose run() method #63

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ let rdkafkaExclude = [
let package = Package(
name: "swift-kafka-gsoc",
platforms: [
.macOS(.v10_15),
.iOS(.v13),
.watchOS(.v6),
.tvOS(.v13),
.macOS(.v13),
.iOS(.v16),
.watchOS(.v9),
.tvOS(.v16),
],
products: [
.library(
Expand Down
35 changes: 23 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,35 @@ The `sendAsync(_:)` method of `KafkaProducer` returns a message-id that can late
```swift
let config = KafkaProducerConfig(bootstrapServers: ["localhost:9092"])

let producer = try await KafkaProducer(
let (producer, acknowledgements) = try await KafkaProducer.newProducerWithAcknowledgements(
config: config,
logger: .kafkaTest // Your logger here
)

let messageID = try await producer.sendAsync(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
)
)
await withThrowingTaskGroup(of: Void.self) { group in

for await acknowledgement in producer.acknowledgements {
// Check if acknowledgement belongs to the sent message
}
// Run Task
group.addTask {
try await producer.run()
}

// Required
await producer.shutdownGracefully()
// Task receiving acknowledgements
group.addTask {
let messageID = try await producer.sendAsync(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
)
)

for await acknowledgement in acknowledgements {
// Check if acknowledgement belongs to the sent message
}

// Required
await producer.shutdownGracefully()
}
}
```

### Consumer API
Expand Down
7 changes: 7 additions & 0 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ public final class KafkaConsumer {
highWatermark: highWatermark
)

// (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt)
// This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer.
// The source MUST be held by the caller and used to signal new elements or finish.
// The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller.
// This is due to the fact that deiniting the sequence is used as part of a trigger to
// terminate the underlying source.
// TODO: make self delegate to avoid weak reference here
let messagesSequenceDelegate = ConsumerMessagesAsyncSequenceDelegate { [weak self] in
self?.produceMore()
} didTerminateClosure: { [weak self] in
Expand Down
147 changes: 88 additions & 59 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct NoDelegate: NIOAsyncSequenceProducerDelegate {
}

/// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
public struct AcknowledgedMessagesAsyncSequence: AsyncSequence {
public struct KafkaMessageAcknowledgements: AsyncSequence {
public typealias Element = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, NoBackPressure, NoDelegate>
let wrappedSequence: WrappedSequence
Expand Down Expand Up @@ -77,65 +77,104 @@ public actor KafkaProducer {
/// Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer.
private var topicHandles: [String: OpaquePointer]

// We use implicitly unwrapped optionals here as these properties need to access self upon initialization
/// Used for handling the connection to the Kafka cluster.
private var client: KafkaClient!
/// Task that polls the Kafka cluster for updates periodically.
private var pollTask: Task<Void, Never>!

/// `AsyncSequence` that returns all ``KafkaProducerMessage`` objects that have been
/// acknowledged by the Kafka cluster.
public nonisolated let acknowledgements: AcknowledgedMessagesAsyncSequence
nonisolated let acknowlegdementsSource: AcknowledgedMessagesAsyncSequence.WrappedSequence.Source
private typealias Acknowledgement = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
private let client: KafkaClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a ticket to try to internalise all of the properties here into our state where it makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#64


// Private initializer, use factory methods to create KafkaProducer
/// Initialize a new ``KafkaProducer``.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
public init(
config: KafkaProducerConfig = KafkaProducerConfig(),
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
/// - Throws: A ``KafkaError`` if initializing the producer failed.
private init(
client: KafkaClient,
topicConfig: KafkaTopicConfig,
logger: Logger
) async throws {
self.client = client
self.topicConfig = topicConfig
self.logger = logger
self.topicHandles = [:]
self.logger = logger
self.state = .started
}

/// Initialize a new ``KafkaProducer`` that ignores incoming message acknowledgements.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: The newly created ``KafkaProducer``.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Initialize a new ``KafkaProducer`` that ignores incoming message acknowledgements.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: The newly created ``KafkaProducer``.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
/// Initialize a new ``KafkaProducer``.
///
/// This factory method creates a producer without message acknowledgements.
///
/// - Parameter configuration: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfiguration: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: The newly created ``KafkaProducer``.
/// - Throws: A ``KafkaError`` if initializing the producer failed.

Can we also create a radar to rename the config structs to spell out configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#65

public static func newProducer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static func newProducer(
public static func makeProducer(

config: KafkaProducerConfig = KafkaProducerConfig(),
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
logger: Logger
) async throws -> KafkaProducer {
let client = try RDKafka.createClient(
type: .producer,
configDictionary: config.dictionary,
// Having no callback will discard any incoming acknowlegement messages
// Ref: rdkafka_broker.c:rd_kafka_dr_msgq
callback: nil,
logger: logger
)

let producer = try await KafkaProducer(
client: client,
topicConfig: topicConfig,
logger: logger
)

return producer
}

/// Initialize a new ``KafkaProducer`` alongside a ``KafkaMessageAcknowledgements`` `AsyncSequence` that can be used
/// to receive message acknowlegements.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
/// `AsyncSequence` used for receiving message acknowledgements.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Initialize a new ``KafkaProducer`` alongside a ``KafkaMessageAcknowledgements`` `AsyncSequence` that can be used
/// to receive message acknowlegements.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
/// `AsyncSequence` used for receiving message acknowledgements.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
/// Initialize a new ``KafkaProducer`` and a ``KafkaMessageAcknowledgements`` asynchronous sequence.
///
/// Use the asynchronous sequence to consume message acknowledgements.
///
/// - Important: When the asynchronous sequence is deinited the producer will be shutdown.
///
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
/// `AsyncSequence` used for receiving message acknowledgements.
/// - Throws: A ``KafkaError`` if initializing the producer failed.

public static func newProducerWithAcknowledgements(
config: KafkaProducerConfig = KafkaProducerConfig(),
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
logger: Logger
) async throws -> (KafkaProducer, KafkaMessageAcknowledgements) {
// (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt)
// This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer.
// The source MUST be held by the caller and used to signal new elements or finish.
// The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller.
// This is due to the fact that deiniting the sequence is used as part of a trigger to
// terminate the underlying source.
let acknowledgementsSourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
elementType: Acknowledgement.self,
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
elementType: Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>.self,
backPressureStrategy: NoBackPressure(),
delegate: NoDelegate()
)
self.acknowlegdementsSource = acknowledgementsSourceAndSequence.source
self.acknowledgements = AcknowledgedMessagesAsyncSequence(
wrappedSequence: acknowledgementsSourceAndSequence.sequence
)
let source = sourceAndSequence.source

self.client = try RDKafka.createClient(
let client = try RDKafka.createClient(
type: .producer,
configDictionary: config.dictionary,
callback: self.deliveryReportCallback,
logger: self.logger
callback: { [logger, source] messageResult in
guard let messageResult else {
logger.error("Could not resolve acknowledged message")
return
}

_ = source.yield(messageResult) // Ignore YieldResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add why we ignore this result i.e. due to no back pressure support.

},
logger: logger
)

// Poll Kafka every millisecond
self.pollTask = Task { [client] in
while !Task.isCancelled {
client?.withKafkaHandlePointer { handle in
rd_kafka_poll(handle, 0)
}
try? await Task.sleep(nanoseconds: 1_000_000)
}
}
let producer = try await KafkaProducer(
client: client,
topicConfig: topicConfig,
logger: logger
)

let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: sourceAndSequence.sequence)
return (producer, acknowlegementsSequence)
}

/// Method to shutdown the ``KafkaProducer``.
Expand All @@ -155,7 +194,7 @@ public actor KafkaProducer {

private func _shutDownGracefully(timeout: Int32) async {
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
// Wait 10 seconds for outstanding messages to be sent and callbacks to be called
// Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called
self.client.withKafkaHandlePointer { handle in
rd_kafka_flush(handle, timeout)
continuation.resume()
Expand All @@ -165,11 +204,24 @@ public actor KafkaProducer {
for (_, topicHandle) in self.topicHandles {
rd_kafka_topic_destroy(topicHandle)
}
self.pollTask.cancel()

self.state = .shutDown
}

/// Start polling Kafka for acknowledged messages.
///
/// - Parameter pollInterval: The desired time interval between two consecutive polls.
/// - Returns: An awaitable task representing the execution of the poll loop.
public func run(pollInterval: Duration = .milliseconds(100)) async throws {
// TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle)
while self.state == .started {
self.client.withKafkaHandlePointer { handle in
rd_kafka_poll(handle, 0)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a method on the KafkaClient which does the withHandle stuff?

try await Task.sleep(for: pollInterval)
}
}

/// Send messages to the Kafka cluster asynchronously, aka "fire and forget".
/// This function is non-blocking.
/// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
Expand Down Expand Up @@ -220,29 +272,6 @@ public actor KafkaProducer {
return self.messageIDCounter
}

// Closure that is executed when a message has been acknowledged by Kafka
private lazy var deliveryReportCallback: (UnsafePointer<rd_kafka_message_t>?) -> Void = { [logger, acknowlegdementsSource] messagePointer in
guard let messagePointer = messagePointer else {
logger.error("Could not resolve acknowledged message")
return
}

let messageID = UInt(bitPattern: messagePointer.pointee._private)

do {
let message = try KafkaAcknowledgedMessage(messagePointer: messagePointer, id: messageID)
_ = acknowlegdementsSource.yield(.success(message))
} catch {
guard let error = error as? KafkaAcknowledgedMessageError else {
fatalError("Caught error that is not of type \(KafkaAcknowledgedMessageError.self)")
}
_ = acknowlegdementsSource.yield(.failure(error))
}

// The messagePointer is automatically destroyed by librdkafka
// For safety reasons, we only use it inside of this closure
}

/// Check `topicHandles` for a handle matching the topic name and create a new handle if needed.
/// - Parameter topic: The name of the topic that is addressed.
private func createTopicHandleIfNeeded(topic: String) throws -> OpaquePointer? {
Expand Down
2 changes: 1 addition & 1 deletion Sources/SwiftKafka/RDKafka/RDKafka.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct RDKafka {
static func createClient(
type: ClientType,
configDictionary: [String: String],
callback: ((UnsafePointer<rd_kafka_message_t>?) -> Void)? = nil,
callback: ((RDKafkaConfig.KafkaAcknowledgementResult?) -> Void)? = nil,
logger: Logger
) throws -> KafkaClient {
let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER
Expand Down
38 changes: 34 additions & 4 deletions Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import Crdkafka

/// A collection of helper functions wrapping common `rd_kafka_conf_*` functions in Swift.
struct RDKafkaConfig {
typealias KafkaAcknowledgementResult = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
final class CapturedClosure {
typealias Closure = (UnsafePointer<rd_kafka_message_t>?) -> Void
typealias Closure = (KafkaAcknowledgementResult?) -> Void
let closure: Closure

init(_ closure: @escaping Closure) {
Expand Down Expand Up @@ -69,7 +70,7 @@ struct RDKafkaConfig {
/// - Returns: A ``CapturedClosure`` object that must me retained by the caller as long as acknowledgements are received.
static func setDeliveryReportCallback(
configPointer: OpaquePointer,
_ callback: @escaping ((UnsafePointer<rd_kafka_message_t>?) -> Void)
_ callback: @escaping ((KafkaAcknowledgementResult?) -> Void)
) -> CapturedClosure {
let capturedClosure = CapturedClosure(callback)
// Pass the captured closure to the C closure as an opaque object
Expand All @@ -83,14 +84,17 @@ struct RDKafkaConfig {
let callbackWrapper: (
@convention(c) (OpaquePointer?, UnsafePointer<rd_kafka_message_t>?, UnsafeMutableRawPointer?) -> Void
) = { _, messagePointer, opaquePointer in

guard let opaquePointer = opaquePointer else {
fatalError("Could not resolve reference to KafkaProducer instance")
}
let opaque = Unmanaged<CapturedClosure>.fromOpaque(opaquePointer).takeUnretainedValue()

let actualCallback = opaque.closure
actualCallback(messagePointer)
let messageResult = Self.convertMessageToAcknowledgementResult(messagePointer: messagePointer)
actualCallback(messageResult)

// The messagePointer is automatically destroyed by librdkafka
// For safety reasons, we only use it inside of this callback
}

rd_kafka_conf_set_dr_msg_cb(
Expand All @@ -100,4 +104,30 @@ struct RDKafkaConfig {

return capturedClosure
}

/// Convert an unsafe`rd_kafka_message_t` object to a safe ``KafkaAcknowledgementResult``.
/// - Parameter messagePointer: An `UnsafePointer` pointing to the `rd_kafka_message_t` object in memory.
/// - Returns: A ``KafkaAcknowledgementResult``.
private static func convertMessageToAcknowledgementResult(
messagePointer: UnsafePointer<rd_kafka_message_t>?
) -> KafkaAcknowledgementResult? {
guard let messagePointer else {
return nil
}

let messageID = UInt(bitPattern: messagePointer.pointee._private)

let messageResult: KafkaAcknowledgementResult
do {
let message = try KafkaAcknowledgedMessage(messagePointer: messagePointer, id: messageID)
messageResult = .success(message)
} catch {
guard let error = error as? KafkaAcknowledgedMessageError else {
fatalError("Caught error that is not of type \(KafkaAcknowledgedMessageError.self)")
}
messageResult = .failure(error)
}

return messageResult
}
}
Loading