diff --git a/tokio-postgres/src/copy_both.rs b/tokio-postgres/src/copy_both.rs index d3b46eab7..e8c2103b0 100644 --- a/tokio-postgres/src/copy_both.rs +++ b/tokio-postgres/src/copy_both.rs @@ -16,6 +16,9 @@ use std::task::{Context, Poll}; /// The state machine of CopyBothReceiver /// /// ```ignore +/// Setup +/// | +/// v /// CopyBoth /// / \ /// v v @@ -189,17 +192,27 @@ impl Stream for CopyBothReceiver { Poll::Pending => match self.state { Setup | CopyBoth | CopyIn => match ready!(self.sink_receiver.poll_next_unpin(cx)) { Some(msg) => Poll::Ready(Some(msg)), - None => { - self.state = match self.state { - CopyBoth => CopyOut, - CopyIn => CopyNone, - _ => unreachable!(), - }; - - let mut buf = BytesMut::new(); - frontend::copy_done(&mut buf); - Poll::Ready(Some(FrontendMessage::Raw(buf.freeze()))) - } + None => match self.state { + // The user has cancelled their interest to this CopyBoth query but we're + // still in the Setup phase. From this point the receiver will either enter + // CopyBoth mode or will receive an Error response from PostgreSQL. When + // either of those happens the state machine will terminate the connection + // appropriately. + Setup => Poll::Pending, + CopyBoth => { + self.state = CopyOut; + let mut buf = BytesMut::new(); + frontend::copy_done(&mut buf); + Poll::Ready(Some(FrontendMessage::Raw(buf.freeze()))) + } + CopyIn => { + self.state = CopyNone; + let mut buf = BytesMut::new(); + frontend::copy_done(&mut buf); + Poll::Ready(Some(FrontendMessage::Raw(buf.freeze()))) + } + _ => unreachable!(), + }, }, _ => Poll::Pending, },