Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaProducer: Expose run() method #63

Merged

Conversation

felixschlegel
Copy link
Contributor

Modifications

  • fix typo in SwiftKafkaTests
  • upgrade minimum OS versions to support the Duration type
  • move conversion of rd_kafka_message_t to KafkaAcknowledgementResult to RDKafkaConfig so that we can pass the KafkaAcknowledgementResult type as early as possible and don't have to bother with UnsafePointer<rd_kafka_message_t> in all our delivery callback logic
  • expose KafkaProducer.run() method
  • README: use TaskGroup in KafkaProducer example
  • add comments to task groups in README
  • refactor all tests into using task groups (structured concurrency)

Have two factory methods creating KafkaProducer

Motivation

We want to have a KafkaProducer that is not consuming any acknowledgements. This means it is initialized without a deliveryReportCallback which in turn means that librdkafka will not queue any incoming acknowledgements which prevents us from running out of memory in that case.

Modifications

  • add two new factory methods for creating KafkaProducer: * KafkaProducer.newProducer * KafkaProducer.newProducerWithAcknowledgements
  • update README

@felixschlegel felixschlegel force-pushed the fs-kafka-producer-run-method branch 2 times, most recently from 70f32cd to 9fd97e2 Compare June 14, 2023 15:17
/// Task that polls the Kafka cluster for updates periodically.
private var pollTask: Task<Void, Never>!
/// Variable that indicates if our poll loop should be running.
private var polling = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this property. We can use the state for this started indicates that we are polling

Motivation:

* Our current implementation of polling `librdkafka` for new message
  acknowledgements in `KafkaProducer` did not support backpressure. Also,
  it made use of some `weak` references which we generally want to avoid
  due to performance reasons.

Modifications:

* fix typo in `SwiftKafkaTests`
* upgrade minimum OS versions to support the `Duration` type
* move conversion of `rd_kafka_message_t` to
  `KafkaAcknowledgementResult` to `RDKafkaConfig` so that we can pass
  the `KafkaAcknowledgementResult` type as early as possible and don't
  have to bother with `UnsafePointer<rd_kafka_message_t>` in all our
  delivery callback logic
* expose KafkaProducer.run() method
* `README`: use `TaskGroup` in `KafkaProducer` example
* add comments to task groups in `README`
* refactor all tests into using task groups (structured concurrency)

Have two factory methods creating KafkaProducer

Motivation:

We want to have a `KafkaProducer` that is not consuming any
acknowledgements. This means it is initialized without a
`deliveryReportCallback` which in turn means that `librdkafka` will not
queue any incoming acknowledgements which prevents us from running out
of memory in that case.

Modifications:

* add two new factory methods for creating `KafkaProducer`:
    * `KafkaProducer.newProducer`
    * `KafkaProducer.newProducerWithAcknowledgements`
* update README
@felixschlegel felixschlegel force-pushed the fs-kafka-producer-run-method branch from 9fd97e2 to 28c9e6f Compare June 15, 2023 12:02
@felixschlegel felixschlegel requested a review from FranzBusch June 15, 2023 12:03
public nonisolated let acknowledgements: AcknowledgedMessagesAsyncSequence
nonisolated let acknowlegdementsSource: AcknowledgedMessagesAsyncSequence.WrappedSequence.Source
private typealias Acknowledgement = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
private let client: KafkaClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a ticket to try to internalise all of the properties here into our state where it makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#64

/// - Parameter logger: A logger.
/// - Returns: The newly created ``KafkaProducer``.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
public static func newProducer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static func newProducer(
public static func makeProducer(

Comment on lines 101 to 106
/// Initialize a new ``KafkaProducer`` that ignores incoming message acknowledgements.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: The newly created ``KafkaProducer``.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Initialize a new ``KafkaProducer`` that ignores incoming message acknowledgements.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: The newly created ``KafkaProducer``.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
/// Initialize a new ``KafkaProducer``.
///
/// This factory method creates a producer without message acknowledgements.
///
/// - Parameter configuration: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfiguration: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: The newly created ``KafkaProducer``.
/// - Throws: A ``KafkaError`` if initializing the producer failed.

Can we also create a radar to rename the config structs to spell out configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#65

Comment on lines 130 to 137
/// Initialize a new ``KafkaProducer`` alongside a ``KafkaMessageAcknowledgements`` `AsyncSequence` that can be used
/// to receive message acknowlegements.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
/// `AsyncSequence` used for receiving message acknowledgements.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Initialize a new ``KafkaProducer`` alongside a ``KafkaMessageAcknowledgements`` `AsyncSequence` that can be used
/// to receive message acknowlegements.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
/// `AsyncSequence` used for receiving message acknowledgements.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
/// Initialize a new ``KafkaProducer`` and a ``KafkaMessageAcknowledgements`` asynchronous sequence.
///
/// Use the asynchronous sequence to consume message acknowledgements.
///
/// - Important: When the asynchronous sequence is deinited the producer will be shutdown.
///
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
/// `AsyncSequence` used for receiving message acknowledgements.
/// - Throws: A ``KafkaError`` if initializing the producer failed.

return
}

_ = source.yield(messageResult) // Ignore YieldResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add why we ignore this result i.e. due to no back pressure support.

Comment on lines 218 to 220
self.client.withKafkaHandlePointer { handle in
rd_kafka_poll(handle, 0)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a method on the KafkaClient which does the withHandle stuff?

Modifications:

* rename `KafkaProducer.newProducer*` to `KafkaProducer.makeProducer*`
* update docc documentation for `KafkaProducer.newProducer*` methods
* create new method `KafkaClient.poll(timeout:)`
Motivation:

Given that we don't support backpressure for our `KafkaProducer`, we can
replace the `NIOAsyncSequence` with a `AsyncStream` for the
acknowledgements.

Furthermore we want to shut down the producer once our stream has
terminated.

Modifications:

* `KafkaProducer`: replace `NIOAsyncSequence` with `AsyncStream`
* `KafkaProducerTests.testNoMemoryLeakAfterShutdown` make sure to kill
  stream otherwise we have a memory leak
@FranzBusch FranzBusch merged commit 840242c into swift-server:main Jun 19, 2023
@FranzBusch FranzBusch deleted the fs-kafka-producer-run-method branch June 19, 2023 12:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants