Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add helper stream type for finalized notifications #14111

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/chain-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ alloy-signer.workspace = true
alloy-signer-local.workspace = true
alloy-consensus.workspace = true
rand.workspace = true
futures.workspace = true

[features]
test-utils = [
Expand Down
143 changes: 143 additions & 0 deletions crates/chain-state/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use alloy_eips::eip2718::Encodable2718;
use derive_more::{Deref, DerefMut};
use pin_project::pin_project;
use reth_execution_types::{BlockReceipts, Chain};
use reth_primitives::{NodePrimitives, RecoveredBlock, SealedHeader};
use reth_storage_api::NodePrimitivesProvider;
Expand Down Expand Up @@ -211,6 +212,86 @@ impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
}
}

/// A stream that emits canonical chain notifications only when blocks are finalized.
#[pin_project]
pub(crate) struct FinalizedNotificationStream<N: NodePrimitives> {
#[pin]
canon_stream: CanonStateNotificationStream<N>,
#[pin]
finalized_stream: ForkChoiceStream<SealedHeader<alloy_consensus::Header>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to replace the concrete header types with

Suggested change
finalized_stream: ForkChoiceStream<SealedHeader<alloy_consensus::Header>>,
finalized_stream: ForkChoiceStream<SealedHeaderFor<N>>,

// Buffer for the most recent canonical notification
buffered_notification: Option<CanonStateNotification<N>>,
hoank101 marked this conversation as resolved.
Show resolved Hide resolved
// Buffer for the most recent finalization event
buffered_finalization: Option<SealedHeader<alloy_consensus::Header>>,
}

impl<N: NodePrimitives> FinalizedNotificationStream<N>
where
N::Block: Clone + Send + Sync + 'static,
{
// Helper method to check if we have a matching pair of notifications
fn try_match(
buffered_notification: &Option<CanonStateNotification<N>>,
buffered_finalization: &Option<SealedHeader<alloy_consensus::Header>>,
) -> Option<CanonStateNotification<N>> {
match (buffered_notification, buffered_finalization) {
(Some(notification), Some(finalized_header))
if notification.tip().hash() == finalized_header.hash() =>
{
Some(notification.clone())
}
_ => None,
}
Comment on lines +233 to +244
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to inline this function so that we can .take the notification instead.

so we can move this inside the poll fn

}
}

impl<N: NodePrimitives> Stream for FinalizedNotificationStream<N>
where
N::Block: Send + Sync + 'static,
{
type Item = CanonStateNotification<N>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let mut progress = false;

// Poll both streams and update buffers as needed
match this.canon_stream.as_mut().poll_next(cx) {
Poll::Ready(Some(notification)) => {
progress = true;
*this.buffered_notification = Some(notification);
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {}
}

match this.finalized_stream.as_mut().poll_next(cx) {
Poll::Ready(Some(finalized_header)) => {
progress = true;
*this.buffered_finalization = Some(finalized_header);
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a check here that breaks if one Option is none

// Check if we have a matching pair using the projected fields
if let Some(notification) =
Self::try_match(this.buffered_notification, this.buffered_finalization)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check can be simplified by checking

buffered.as_ref.map(|n|n.tip) == header.as_ref.map() {
   header.take();
   return buffered.take()
}

{
return Poll::Ready(Some(notification));
}

// If we made no progress and both streams are pending, break the loop
if !progress {
break;
}
}

Poll::Pending
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -472,4 +553,66 @@ mod tests {
// Confirm this is from the committed segment.
assert!(!block_receipts[1].1);
}

#[test]
fn test_finalized_notification_stream() {
// Create a default block for testing
let block: RecoveredBlock<reth_primitives::Block> = Default::default();

// Create the header first so we can use its hash
let header = alloy_consensus::Header { number: 1u64, ..Default::default() };
let sealed_header = SealedHeader::seal_slow(header);
let block_hash = sealed_header.hash();

// Now create the block with the matching hash
let mut test_block = block;
test_block.set_block_number(1);
test_block.set_hash(block_hash);

// Create the chain with our test block
let chain: Arc<Chain> =
Arc::new(Chain::new(vec![test_block.clone()], ExecutionOutcome::default(), None));

// Create channels for both streams
let (canon_tx, canon_rx) = broadcast::channel(10);
let (finalized_tx, finalized_rx) = watch::channel(None);

// Create the notification stream
let mut stream = FinalizedNotificationStream {
canon_stream: CanonStateNotificationStream { st: BroadcastStream::new(canon_rx) },
finalized_stream: ForkChoiceStream::new(finalized_rx),
buffered_notification: None,
buffered_finalization: None,
};

// Create a notification
let notification = CanonStateNotification::Commit { new: chain };

// Send the canonical notification
canon_tx.send(notification).unwrap();

// Create a waker and context for polling
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);

// First poll should be pending since we haven't received finalization yet
assert!(matches!(Pin::new(&mut stream).poll_next(&mut cx), Poll::Pending));

// Send the finalization notification
finalized_tx.send(Some(sealed_header)).unwrap();

// Now polling should return the notification since we have both events
match Pin::new(&mut stream).poll_next(&mut cx) {
Poll::Ready(Some(received)) => {
assert_eq!(received.tip().hash(), block_hash);
match received {
CanonStateNotification::Commit { new } => {
assert_eq!(new.tip().hash(), block_hash);
}
_ => panic!("Expected Commit notification"),
}
}
other => panic!("Expected Ready(Some), got: {:?}", other),
}
}
}
Loading