diff --git a/Cargo.lock b/Cargo.lock index 9bab55f1..59908c8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -871,7 +871,7 @@ dependencies = [ [[package]] name = "cryo_python" -version = "0.2.0" +version = "0.3.0" dependencies = [ "cryo_cli", "cryo_freeze", diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index a147845a..c2327b42 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -248,6 +248,10 @@ pub struct Args { #[arg(long, value_name = "SIG", help_heading = "Dataset-specific Options", num_args(1..))] pub event_signature: Option, + /// Function signature for transaction calldata decoding + #[arg(long, value_name = "SIG", help_heading = "Dataset-specific Options", num_args(1..))] + pub function_signature: Option, + /// Blocks per request (eth_getLogs) #[arg( long, diff --git a/crates/cli/src/parse/schemas.rs b/crates/cli/src/parse/schemas.rs index a4e04ece..656f1e66 100644 --- a/crates/cli/src/parse/schemas.rs +++ b/crates/cli/src/parse/schemas.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use cryo_freeze::{ - ColumnEncoding, Datatype, FileFormat, LogDecoder, MultiDatatype, ParseError, Table, + CalldataDecoder, ColumnEncoding, Datatype, FileFormat, LogDecoder, MultiDatatype, ParseError, + Table, }; use super::file_output; @@ -47,6 +48,14 @@ pub(crate) fn parse_schemas( None => None, }; + let calldata_decoder = match args.function_signature { + Some(ref sig) => match CalldataDecoder::new(sig.clone()) { + Ok(res) => Some(res), + Err(_) => return Err(ParseError::ParseError("invalid function signature".to_string())), + }, + None => None, + }; + // create schemas let schemas: Result, ParseError> = datatypes .iter() @@ -60,6 +69,7 @@ pub(crate) fn parse_schemas( &args.columns, sort[datatype].clone(), log_decoder.clone(), + calldata_decoder.clone(), ) .map(|schema| (*datatype, schema)) .map_err(|e| { diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 896b12ab..8d000ccd 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -25,6 +25,7 @@ pub struct Transactions { chain_id: Vec, timestamp: Vec, block_hash: Vec>, + function_cols: indexmap::IndexMap>, } #[async_trait::async_trait] @@ -89,8 +90,20 @@ impl CollectByBlock for Transactions { } else { Box::new(|_| true) }; - let transactions = - block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect(); + let function_signature_filter: Box bool + Send> = + if let Some(decoder) = &schema.calldata_decoder { + Box::new(move |tx| tx.input.starts_with(&decoder.function.short_signature()[..])) + } else { + Box::new(|_| true) + }; + let transactions = block + .transactions + .clone() + .into_iter() + .filter(from_filter) + .filter(to_filter) + .filter(function_signature_filter) + .collect(); // 2. collect receipts if necessary // if transactions are filtered fetch by set of transaction hashes, else fetch all receipts @@ -178,6 +191,26 @@ pub(crate) fn process_transaction( exclude_failed: bool, timestamp: u32, ) -> R<()> { + // if calldata_decoder is supplied, transactions should be processed only if + // the calldata matches the given function signature + let decoded_args = match &schema.calldata_decoder { + None => None, + Some(decoder) => { + if tx.input.len() < 4 { + return Ok(()) + } + match decoder.function.decode_input(&tx.input[4..]) { + Ok(decoded_input) => { + Some(decoded_input.into_iter().zip(&decoder.args).collect::>()) + } + Err(_) => { + // if decoder exists and decode fails, return without appending column + return Ok(()) + } + } + } + }; + let success = if exclude_failed | schema.has_column("success") { let success = tx_success(&tx, &receipt)?; if exclude_failed & !success { @@ -212,6 +245,15 @@ pub(crate) fn process_transaction( store!(schema, columns, timestamp, timestamp); store!(schema, columns, block_hash, tx.block_hash.unwrap_or_default().as_bytes().to_vec()); + match decoded_args { + None => {} + Some(decoded_args) => { + for (token, arg) in decoded_args { + columns.function_cols.entry(arg.clone()).or_default().push(token) + } + } + } + Ok(()) } diff --git a/crates/freeze/src/types/decoders/calldata_decoder.rs b/crates/freeze/src/types/decoders/calldata_decoder.rs new file mode 100644 index 00000000..50ece42b --- /dev/null +++ b/crates/freeze/src/types/decoders/calldata_decoder.rs @@ -0,0 +1,213 @@ +use ethers_core::{ + abi::{self, AbiEncode, HumanReadableParser, Param, ParamType, Token}, + types::{I256, U256}, +}; +use polars::{prelude::NamedFrom, series::Series}; + +use crate::{err, CollectError, ColumnEncoding, ToU256Series, U256Type}; + +/// container for calldata decoding context +#[derive(Clone, Debug, PartialEq)] +pub struct CalldataDecoder { + /// the raw function signature string ex: transfer(address to, uint256 value) + pub raw: String, + /// decoded abi type of function signature string + pub function: abi::Function, + /// argument names of function + pub args: Vec, +} + +impl CalldataDecoder { + /// create a new CalldataDecoder from function signature + pub fn new(function_signature: String) -> Result { + match HumanReadableParser::parse_function(function_signature.as_str()) { + Ok(mut function) => { + let args = function + .inputs + .clone() + .into_iter() + .enumerate() + .map(|(i, param)| { + if param.name.is_empty() { + let name = format!("arg_{}", i); + function.inputs[i].name = name.clone(); + name + } else { + param.name + } + }) + .collect(); + Ok(Self { function, raw: function_signature.clone(), args }) + } + Err(e) => { + let err = format!( + "incorrectly formatted function {} (expect something like function transfer(address,uint256) err: {}", + function_signature, e + ); + eprintln!("{}", err); + Err(err) + } + } + } + + /// data should never be mixed type, otherwise this will return inconsistent results + pub fn make_series( + &self, + name: String, + data: Vec, + chunk_len: usize, + u256_types: &[U256Type], + column_encoding: &ColumnEncoding, + ) -> Result, CollectError> { + // This is a smooth brain way of doing this, but I can't think of a better way right now + let mut ints: Vec = vec![]; + let mut uints: Vec = vec![]; + let mut str_ints: Vec = vec![]; + let mut u256s: Vec = vec![]; + let mut i256s: Vec = vec![]; + let mut bytes: Vec> = vec![]; + let mut hexes: Vec = vec![]; + let mut bools: Vec = vec![]; + let mut strings: Vec = vec![]; + // TODO: support array & tuple types + + let param = self + .function + .inputs + .clone() + .into_iter() + .filter(|i| i.name == name) + .collect::>(); + let param = param.first(); + + for token in data { + match token { + Token::Address(a) => match column_encoding { + ColumnEncoding::Binary => bytes.push(a.to_fixed_bytes().into()), + ColumnEncoding::Hex => hexes.push(format!("{:?}", a)), + }, + Token::FixedBytes(b) => match column_encoding { + ColumnEncoding::Binary => bytes.push(b), + ColumnEncoding::Hex => hexes.push(b.encode_hex()), + }, + Token::Bytes(b) => match column_encoding { + ColumnEncoding::Binary => bytes.push(b), + ColumnEncoding::Hex => hexes.push(b.encode_hex()), + }, + Token::Uint(i) => match param { + Some(param) => match param.kind.clone() { + ParamType::Uint(size) => { + if size <= 64 { + uints.push(i.as_u64()) + } else { + u256s.push(i) + } + } + _ => str_ints.push(i.to_string()), + }, + None => match i.try_into() { + Ok(i) => ints.push(i), + Err(_) => str_ints.push(i.to_string()), + }, + }, + Token::Int(i) => { + let i = I256::from_raw(i); + match param { + Some(param) => match param.kind.clone() { + ParamType::Int(size) => { + if size <= 64 { + ints.push(i.as_i64()) + } else { + i256s.push(i) + } + } + _ => str_ints.push(i.to_string()), + }, + None => match i.try_into() { + Ok(i) => ints.push(i), + Err(_) => str_ints.push(i.to_string()), + }, + } + } + Token::Bool(b) => bools.push(b), + Token::String(s) => strings.push(s), + Token::Array(_) | Token::FixedArray(_) => {} + Token::Tuple(_) => {} + } + } + let mixed_length_err = format!("could not parse column {}, mixed type", name); + let mixed_length_err = mixed_length_err.as_str(); + + // check each vector, see if it contains any values, if it does, check if it's the same + // length as the input data and map to a series + let name = format!("param__{}", name); + if !ints.is_empty() { + Ok(vec![Series::new(name.as_str(), ints)]) + } else if !i256s.is_empty() { + let mut series_vec = Vec::new(); + for u256_type in u256_types.iter() { + series_vec.push(i256s.to_u256_series( + name.clone(), + u256_type.clone(), + column_encoding, + )?) + } + Ok(series_vec) + } else if !u256s.is_empty() { + let mut series_vec: Vec = Vec::new(); + for u256_type in u256_types.iter() { + series_vec.push(u256s.to_u256_series( + name.clone(), + u256_type.clone(), + column_encoding, + )?) + } + Ok(series_vec) + } else if !uints.is_empty() { + Ok(vec![Series::new(name.as_str(), uints)]) + } else if !str_ints.is_empty() { + Ok(vec![Series::new(name.as_str(), str_ints)]) + } else if !bytes.is_empty() { + if bytes.len() != chunk_len { + return Err(err(mixed_length_err)) + } + Ok(vec![Series::new(name.as_str(), bytes)]) + } else if !hexes.is_empty() { + if hexes.len() != chunk_len { + return Err(err(mixed_length_err)) + } + Ok(vec![Series::new(name.as_str(), hexes)]) + } else if !bools.is_empty() { + if bools.len() != chunk_len { + return Err(err(mixed_length_err)) + } + Ok(vec![Series::new(name.as_str(), bools)]) + } else if !strings.is_empty() { + if strings.len() != chunk_len { + return Err(err(mixed_length_err)) + } + Ok(vec![Series::new(name.as_str(), strings)]) + } else { + // case where no data was passed + Ok(vec![Series::new(name.as_str(), vec![None::; chunk_len])]) + } + } +} + +mod tests { + #[allow(unused_imports)] + use super::CalldataDecoder; + + #[test] + fn test_human_readable_parser() { + let decoder = + CalldataDecoder::new("transfer(address to,uint256 value)".to_string()).unwrap(); + assert_eq!(decoder.args, vec!["to".to_string(), "value".to_string()]); + } + + // #[test] + // fn test_human_readable_parser_without_arg_name() { + // let decoder = CalldataDecoder::new("transfer(address,uint256)".to_string()).unwrap(); + // assert_eq!(decoder.args, vec!["arg_0".to_string(), "arg_1".to_string()]); + // } +} diff --git a/crates/freeze/src/types/decoders/mod.rs b/crates/freeze/src/types/decoders/mod.rs index eda24187..8e6490cd 100644 --- a/crates/freeze/src/types/decoders/mod.rs +++ b/crates/freeze/src/types/decoders/mod.rs @@ -1,3 +1,6 @@ /// log decoder pub mod log_decoder; pub use log_decoder::*; +/// calldata decoder +pub mod calldata_decoder; +pub use calldata_decoder::*; diff --git a/crates/freeze/src/types/schemas.rs b/crates/freeze/src/types/schemas.rs index 6584c4e7..a40337a6 100644 --- a/crates/freeze/src/types/schemas.rs +++ b/crates/freeze/src/types/schemas.rs @@ -1,7 +1,7 @@ /// types and functions related to schemas use std::collections::HashMap; -use crate::{err, CollectError, ColumnEncoding, Datatype, LogDecoder}; +use crate::{err, CalldataDecoder, CollectError, ColumnEncoding, Datatype, LogDecoder}; use indexmap::{IndexMap, IndexSet}; use thiserror::Error; @@ -39,6 +39,9 @@ pub struct Table { /// log decoder for table pub log_decoder: Option, + + /// calldata decoder for table + pub calldata_decoder: Option, } impl Table { @@ -174,6 +177,7 @@ impl Datatype { columns: &Option>, sort: Option>, log_decoder: Option, + calldata_decoder: Option, ) -> Result { let column_types = self.column_types(); let all_columns = column_types.keys().map(|k| k.to_string()).collect(); @@ -201,6 +205,7 @@ impl Datatype { u256_types: u256_types.to_owned(), binary_type: binary_column_format.clone(), log_decoder, + calldata_decoder, }; Ok(schema) } @@ -248,14 +253,32 @@ mod tests { fn test_table_schema_explicit_cols() { let cols = Some(vec!["block_number".to_string(), "block_hash".to_string()]); let table = Datatype::Blocks - .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &None, &None, &cols, None, None) + .table_schema( + &get_u256_types(), + &ColumnEncoding::Hex, + &None, + &None, + &cols, + None, + None, + None, + ) .unwrap(); assert_eq!(vec!["block_number", "block_hash"], table.columns()); // "all" marker support let cols = Some(vec!["all".to_string()]); let table = Datatype::Blocks - .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &None, &None, &cols, None, None) + .table_schema( + &get_u256_types(), + &ColumnEncoding::Hex, + &None, + &None, + &cols, + None, + None, + None, + ) .unwrap(); assert_eq!(15, table.columns().len()); assert!(table.columns().contains(&"block_hash")); @@ -274,6 +297,7 @@ mod tests { &None, None, None, + None, ) .unwrap(); assert_eq!(9, table.columns().len()); @@ -290,6 +314,7 @@ mod tests { &None, None, None, + None, ) .unwrap(); assert_eq!(Some(&"chain_id"), table.columns().last()); @@ -306,6 +331,7 @@ mod tests { &None, None, None, + None, ) .unwrap(); assert_eq!(15, table.columns().len()); @@ -317,7 +343,16 @@ mod tests { fn test_table_schema_exclude_cols() { // defaults let table = Datatype::Blocks - .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &None, &None, &None, None, None) + .table_schema( + &get_u256_types(), + &ColumnEncoding::Hex, + &None, + &None, + &None, + None, + None, + None, + ) .unwrap(); assert_eq!(8, table.columns().len()); assert!(table.columns().contains(&"author")); @@ -333,6 +368,7 @@ mod tests { &None, None, None, + None, ) .unwrap(); assert_eq!(6, table.columns().len()); @@ -350,6 +386,7 @@ mod tests { &None, None, None, + None, ) .unwrap(); assert_eq!(7, table.columns().len()); @@ -370,6 +407,7 @@ mod tests { &None, None, None, + None, ) .unwrap(); assert!(!table.columns().contains(&"author")); diff --git a/crates/python/rust/collect_adapter.rs b/crates/python/rust/collect_adapter.rs index 0f8e4672..389173a0 100644 --- a/crates/python/rust/collect_adapter.rs +++ b/crates/python/rust/collect_adapter.rs @@ -64,6 +64,7 @@ use cryo_freeze::collect; verbose = false, no_verbose = false, event_signature = None, + function_signature = None, ) )] #[allow(clippy::too_many_arguments)] @@ -125,6 +126,7 @@ pub fn _collect( verbose: bool, no_verbose: bool, event_signature: Option, + function_signature: Option, ) -> PyResult<&PyAny> { if let Some(command) = command { pyo3_asyncio::tokio::future_into_py(py, async move { @@ -190,6 +192,7 @@ pub fn _collect( verbose, no_verbose, event_signature, + function_signature, }; pyo3_asyncio::tokio::future_into_py(py, async move { match run_collect(args).await { diff --git a/crates/python/rust/freeze_adapter.rs b/crates/python/rust/freeze_adapter.rs index 5b7616ea..2e6dee09 100644 --- a/crates/python/rust/freeze_adapter.rs +++ b/crates/python/rust/freeze_adapter.rs @@ -61,6 +61,7 @@ use cryo_cli::{run, Args}; verbose = false, no_verbose = false, event_signature = None, + function_signature = None, ) )] #[allow(clippy::too_many_arguments)] @@ -122,6 +123,7 @@ pub fn _freeze( verbose: bool, no_verbose: bool, event_signature: Option, + function_signature: Option, ) -> PyResult<&PyAny> { if let Some(command) = command { freeze_command(py, command) @@ -182,6 +184,7 @@ pub fn _freeze( verbose, no_verbose, event_signature, + function_signature, }; pyo3_asyncio::tokio::future_into_py(py, async move { diff --git a/crates/to_df/src/lib.rs b/crates/to_df/src/lib.rs index 97f10c2b..c66b4759 100644 --- a/crates/to_df/src/lib.rs +++ b/crates/to_df/src/lib.rs @@ -171,6 +171,122 @@ pub fn to_df(attrs: TokenStream, input: TokenStream) -> TokenStream { quote! {} }; + let has_function_cols = !field_names_and_types + .iter() + .filter(|(name, _)| name == "function_cols") + .collect::>() + .is_empty(); + let function_code = if has_function_cols { + // Generate the tokens for the event processing code + quote! { + let decoder = schema.calldata_decoder.clone(); + let u256_types: Vec<_> = schema.u256_types.clone().into_iter().collect(); + if let Some(decoder) = decoder { + + fn create_empty_u256_columns( + cols: &mut Vec, + name: &str, + u256_types: &[U256Type], + column_encoding: &ColumnEncoding + ) { + for u256_type in u256_types.iter() { + let full_name = name.to_string() + u256_type.suffix().as_str(); + let full_name = full_name.as_str(); + + match u256_type { + U256Type::Binary => { + match column_encoding { + ColumnEncoding::Binary => { + cols.push(Series::new(full_name, Vec::>::new())) + }, + ColumnEncoding::Hex => { + cols.push(Series::new(full_name, Vec::::new())) + }, + } + }, + U256Type::String => cols.push(Series::new(full_name, Vec::::new())), + U256Type::F32 => cols.push(Series::new(full_name, Vec::::new())), + U256Type::F64 => cols.push(Series::new(full_name, Vec::::new())), + U256Type::U32 => cols.push(Series::new(full_name, Vec::::new())), + U256Type::U64 => cols.push(Series::new(full_name, Vec::::new())), + U256Type::Decimal128 => cols.push(Series::new(full_name, Vec::>::new())), + } + } + } + + use ethers_core::abi::ParamType; + + // Write columns even if there are no values decoded - indicates empty dataframe + let chunk_len = self.n_rows; + if self.function_cols.is_empty() { + for param in decoder.function.inputs.iter() { + let name = "param__".to_string() + param.name.as_str(); + let name = name.as_str(); + match param.kind { + ParamType::Address => { + match schema.binary_type { + ColumnEncoding::Binary => cols.push(Series::new(name, Vec::>::new())), + ColumnEncoding::Hex => cols.push(Series::new(name, Vec::::new())), + } + }, + ParamType::Bytes => { + match schema.binary_type { + ColumnEncoding::Binary => cols.push(Series::new(name, Vec::>::new())), + ColumnEncoding::Hex => cols.push(Series::new(name, Vec::::new())), + } + }, + ParamType::Int(bits) => { + if bits <= 64 { + cols.push(Series::new(name, Vec::::new())) + } else { + create_empty_u256_columns(&mut cols, name, &u256_types, &schema.binary_type) + } + }, + ParamType::Uint(bits) => { + if bits <= 64 { + cols.push(Series::new(name, Vec::::new())) + } else { + create_empty_u256_columns(&mut cols, name, &u256_types, &schema.binary_type) + } + }, + ParamType::Bool => cols.push(Series::new(name, Vec::::new())), + ParamType::String => cols.push(Series::new(name, Vec::::new())), + ParamType::Array(_) => return Err(err("could not generate Array column")), + ParamType::FixedBytes(_) => return Err(err("could not generate FixedBytes column")), + ParamType::FixedArray(_, _) => return Err(err("could not generate FixedArray column")), + ParamType::Tuple(_) => return Err(err("could not generate Tuple column")), + _ => (), + } + } + } else { + for (name, data) in self.function_cols { + let series_vec = decoder.make_series( + name, + data, + chunk_len as usize, + &u256_types, + &schema.binary_type, + ); + match series_vec { + Ok(s) => { + cols.extend(s); + } + Err(e) => eprintln!("error creating frame: {}", e), /* TODO: see how best + * to + * bubble up error */ + } + } + } + + let drop_names = vec!["topic1".to_string(), "topic2".to_string(), "topic3".to_string(), "data".to_string()]; + cols.retain(|c| !drop_names.contains(&c.name().to_string())); + } + } + } else { + // Generate an empty set of tokens if has_event_cols is false + quote! {} + }; + fn map_type_to_column_type(ty: &syn::Type) -> Option { match quote!(#ty).to_string().as_str() { "Vec < bool >" => Some(quote! { ColumnType::Boolean }), @@ -208,7 +324,7 @@ pub fn to_df(attrs: TokenStream, input: TokenStream) -> TokenStream { if let Some(column_type) = map_type_to_column_type(ty) { let field_name_str = format!("{}", quote!(#name)); column_types.push(quote! { (#field_name_str, #column_type) }); - } else if name != "n_rows" && name != "event_cols" { + } else if name != "n_rows" && name != "event_cols" && name != "function_cols" { println!("invalid column type for {name} in table {}", datatype_str); } } @@ -242,6 +358,8 @@ pub fn to_df(attrs: TokenStream, input: TokenStream) -> TokenStream { #event_code + #function_code + let df = DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema)?; let mut output = std::collections::HashMap::new(); output.insert(datatype, df);