Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(engine): emit events with executed blocks #14341

Merged
merged 2 commits into from
Feb 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/engine/primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ reth-payload-primitives.workspace = true
reth-payload-builder-primitives.workspace = true
reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-chain-state.workspace = true
reth-trie.workspace = true
reth-errors.workspace = true

Expand Down
17 changes: 11 additions & 6 deletions crates/engine/primitives/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
//! Events emitted by the beacon consensus engine.

use crate::ForkchoiceStatus;
use alloc::{boxed::Box, sync::Arc};
use alloc::boxed::Box;
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_rpc_types_engine::ForkchoiceState;
use core::{
fmt::{Display, Formatter, Result},
time::Duration,
};
use reth_primitives::{EthPrimitives, SealedBlock};
use reth_chain_state::ExecutedBlockWithTrieUpdates;
use reth_primitives::EthPrimitives;
use reth_primitives_traits::{NodePrimitives, SealedHeader};

/// Events emitted by the consensus engine.
Expand All @@ -18,9 +19,9 @@ pub enum BeaconConsensusEngineEvent<N: NodePrimitives = EthPrimitives> {
/// The fork choice state was updated, and the current fork choice status
ForkchoiceUpdated(ForkchoiceState, ForkchoiceStatus),
/// A block was added to the fork chain.
ForkBlockAdded(Arc<SealedBlock<N::Block>>, Duration),
ForkBlockAdded(ExecutedBlockWithTrieUpdates<N>, Duration),
/// A block was added to the canonical chain, and the elapsed time validating the block
CanonicalBlockAdded(Arc<SealedBlock<N::Block>>, Duration),
CanonicalBlockAdded(ExecutedBlockWithTrieUpdates<N>, Duration),
/// A canonical chain was committed, and the elapsed time committing the data
CanonicalChainCommitted(Box<SealedHeader<N::BlockHeader>>, Duration),
/// The consensus engine is involved in live sync, and has specific progress
Expand Down Expand Up @@ -48,10 +49,14 @@ where
write!(f, "ForkchoiceUpdated({state:?}, {status:?})")
}
Self::ForkBlockAdded(block, duration) => {
write!(f, "ForkBlockAdded({:?}, {duration:?})", block.num_hash())
write!(f, "ForkBlockAdded({:?}, {duration:?})", block.recovered_block.num_hash())
}
Self::CanonicalBlockAdded(block, duration) => {
write!(f, "CanonicalBlockAdded({:?}, {duration:?})", block.num_hash())
write!(
f,
"CanonicalBlockAdded({:?}, {duration:?})",
block.recovered_block.num_hash()
)
}
Self::CanonicalChainCommitted(block, duration) => {
write!(f, "CanonicalChainCommitted({:?}, {duration:?})", block.num_hash())
Expand Down
26 changes: 12 additions & 14 deletions crates/engine/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@

extern crate alloc;

use alloy_consensus::BlockHeader;
use alloy_eips::{eip7685::Requests, Decodable2718};
use alloy_primitives::B256;
use reth_payload_primitives::{BuiltPayload, PayloadAttributes};
mod error;

use alloy_rpc_types_engine::{ExecutionPayloadSidecar, PayloadError};
use core::fmt::{self, Debug};
use reth_payload_primitives::{
validate_execution_requests, BuiltPayload, EngineApiMessageVersion,
EngineObjectValidationError, InvalidPayloadAttributesError, PayloadAttributes,
PayloadOrAttributes, PayloadTypes,
};
use reth_primitives::{NodePrimitives, SealedBlock};
use reth_primitives_traits::Block;
use serde::{de::DeserializeOwned, Deserialize, Serialize};

use alloy_consensus::BlockHeader;
use alloy_rpc_types_engine::{ExecutionPayloadSidecar, PayloadError};
mod error;
pub use error::*;

mod forkchoice;
Expand All @@ -33,15 +40,6 @@ pub use event::*;
mod invalid_block_hook;
pub use invalid_block_hook::InvalidBlockHook;

use alloy_eips::{eip7685::Requests, Decodable2718};
use reth_payload_primitives::{
validate_execution_requests, EngineApiMessageVersion, EngineObjectValidationError,
InvalidPayloadAttributesError, PayloadOrAttributes, PayloadTypes,
};
use reth_primitives::{NodePrimitives, SealedBlock};
use reth_primitives_traits::Block;
use serde::{de::DeserializeOwned, Deserialize, Serialize};

/// Struct aggregating [`alloy_rpc_types_engine::ExecutionPayload`] and [`ExecutionPayloadSidecar`]
/// and encapsulating complete payload supplied for execution.
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
29 changes: 11 additions & 18 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,14 +1392,10 @@ where
self.canonical_in_memory_state.set_pending_block(block.clone());
}

let sealed_block = Arc::new(block.sealed_block().clone());
self.state.tree_state.insert_executed(block);
self.state.tree_state.insert_executed(block.clone());
self.metrics.engine.inserted_already_executed_blocks.increment(1);
self.emit_event(EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(
sealed_block,
now.elapsed(),
),
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
));
}
EngineApiRequest::Beacon(request) => {
Expand Down Expand Up @@ -2409,8 +2405,6 @@ where
return Err(e.into())
}

let sealed_block = Arc::new(block.clone_sealed_block());

// We only run the parallel state root if we are currently persisting blocks that are all
// ancestors of the one we are executing. If we're committing ancestor blocks, then: any
// trie updates being committed are a subset of the in-memory trie updates collected before
Expand Down Expand Up @@ -2558,7 +2552,7 @@ where
self.handle_state_root_result(
state_root_handle,
state_root_config,
sealed_block.as_ref(),
block.sealed_block(),
&hashed_state,
&state_provider,
root_time,
Expand Down Expand Up @@ -2617,8 +2611,7 @@ where
// prewarm tasks are still running at this point however
drop(prewarm_task_lock.write().unwrap());
// apply state updates to cache and save it (if saving was successful)
self.most_recent_cache =
state_provider.save_cache(sealed_block.hash(), &output.state).ok();
self.most_recent_cache = state_provider.save_cache(block.hash(), &output.state).ok();
let elapsed = save_cache_start.elapsed();

// record how long it took to save caches
Expand All @@ -2641,15 +2634,15 @@ where
self.canonical_in_memory_state.set_pending_block(executed.clone());
}

self.state.tree_state.insert_executed(executed);
self.state.tree_state.insert_executed(executed.clone());
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);

// emit insert event
let elapsed = start.elapsed();
let engine_event = if self.is_fork(block_num_hash.hash)? {
BeaconConsensusEngineEvent::ForkBlockAdded(sealed_block, elapsed)
BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
} else {
BeaconConsensusEngineEvent::CanonicalBlockAdded(sealed_block, elapsed)
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
};
self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));

Expand Down Expand Up @@ -3540,9 +3533,9 @@ mod tests {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, _),
) => {
assert_eq!(block.hash(), expected_hash);
assert_eq!(executed.recovered_block.hash(), expected_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
Expand All @@ -3552,10 +3545,10 @@ mod tests {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
block,
executed,
_,
)) => {
assert_eq!(block.hash(), expected_hash);
assert_eq!(executed.recovered_block.hash(), expected_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
Expand Down
6 changes: 4 additions & 2 deletions crates/node/events/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ impl NodeState {
}
}
}
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) => {
let block = executed.sealed_block();
info!(
number=block.number(),
hash=?block.hash(),
Expand All @@ -272,7 +273,8 @@ impl NodeState {

info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
}
BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed) => {
BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed) => {
let block = executed.sealed_block();
info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
}
}
Expand Down
Loading