Skip to content

Commit

Permalink
make ThreadedProducer clonable like BaseProducer
Browse files Browse the repository at this point in the history
The threaded producer was not `Clone` due to handling the background
thread join handle but that can easily be stored in an `Arc` that the
last drop of the client unwraps and terminates the background thread.

`Arc::into_inner` guarantees that exactly one thread will manage to
unwrap the handle.
  • Loading branch information
petrosagg committed Oct 3, 2024
1 parent 4a217ec commit c0eb5c4
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,13 +494,14 @@ where
/// queued events, such as delivery notifications. The thread will be
/// automatically stopped when the producer is dropped.
#[must_use = "The threaded producer will stop immediately if unused"]
#[derive(Clone)]
pub struct ThreadedProducer<C>
where
C: ProducerContext + 'static,
{
producer: BaseProducer<C>,
should_stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
handle: Option<Arc<JoinHandle<()>>>,
}

impl FromClientConfig for ThreadedProducer<DefaultProducerContext> {
Expand Down Expand Up @@ -545,7 +546,7 @@ where
Ok(ThreadedProducer {
producer,
should_stop,
handle: Some(thread),
handle: Some(Arc::new(thread)),
})
}
}
Expand Down Expand Up @@ -628,7 +629,7 @@ where
{
fn drop(&mut self) {
trace!("Destroy ThreadedProducer");
if let Some(handle) = self.handle.take() {
if let Some(handle) = self.handle.take().and_then(Arc::into_inner) {
trace!("Stopping polling");
self.should_stop.store(true, Ordering::Relaxed);
trace!("Waiting for polling thread termination");
Expand Down

0 comments on commit c0eb5c4

Please sign in to comment.