From aea9b1b499e3db7e79e0e76dcda07ed5ba8d2b01 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Thu, 17 Oct 2024 12:07:18 +0300 Subject: [PATCH] copy_both: handle `Future` cancellation case When a user starts a CopyBoth query a state machine is created that is responsible for driving the CopyBoth protocol forward regardless of what the user does with the returned handles. The state machine starts in the `Setup` phase and only gives control back to the user (in the form of the returned handles) after having finished with the Setup. Therefore the code assumed (incorrectly) that the handles will never be dropped while in Setup mode. It turns out there is one more way of dropping the handle even while in the Setup phase, and that is to cancel the entire Future object returned by the `copy_both` method at just the right time. This resulted in the state machine observing that the handles were dropped while still in the Setup phase, which was asserted to never happen. This PR fixes the problem by handling the case where a handles are dropped while still in the `Setup` phase. Signed-off-by: Petros Angelatos --- tokio-postgres/src/copy_both.rs | 35 ++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/tokio-postgres/src/copy_both.rs b/tokio-postgres/src/copy_both.rs index d3b46eab7..e8c2103b0 100644 --- a/tokio-postgres/src/copy_both.rs +++ b/tokio-postgres/src/copy_both.rs @@ -16,6 +16,9 @@ use std::task::{Context, Poll}; /// The state machine of CopyBothReceiver /// /// ```ignore +/// Setup +/// | +/// v /// CopyBoth /// / \ /// v v @@ -189,17 +192,27 @@ impl Stream for CopyBothReceiver { Poll::Pending => match self.state { Setup | CopyBoth | CopyIn => match ready!(self.sink_receiver.poll_next_unpin(cx)) { Some(msg) => Poll::Ready(Some(msg)), - None => { - self.state = match self.state { - CopyBoth => CopyOut, - CopyIn => CopyNone, - _ => unreachable!(), - }; - - let mut buf = BytesMut::new(); - frontend::copy_done(&mut buf); - Poll::Ready(Some(FrontendMessage::Raw(buf.freeze()))) - } + None => match self.state { + // The user has cancelled their interest to this CopyBoth query but we're + // still in the Setup phase. From this point the receiver will either enter + // CopyBoth mode or will receive an Error response from PostgreSQL. When + // either of those happens the state machine will terminate the connection + // appropriately. + Setup => Poll::Pending, + CopyBoth => { + self.state = CopyOut; + let mut buf = BytesMut::new(); + frontend::copy_done(&mut buf); + Poll::Ready(Some(FrontendMessage::Raw(buf.freeze()))) + } + CopyIn => { + self.state = CopyNone; + let mut buf = BytesMut::new(); + frontend::copy_done(&mut buf); + Poll::Ready(Some(FrontendMessage::Raw(buf.freeze()))) + } + _ => unreachable!(), + }, }, _ => Poll::Pending, },