Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: streaming feature #6

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cli/solanaetl/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from solanaetl.cli.extract_field import extract_field
from solanaetl.cli.extract_token_transfers import extract_token_transfers
from solanaetl.cli.extract_tokens import extract_tokens
from solanaetl.cli.stream import stream



@click.group()
Expand All @@ -44,3 +46,6 @@ def cli(ctx):

# utils
cli.add_command(extract_field, "extract_field")

# streaming
cli.add_command(stream, "stream")
93 changes: 93 additions & 0 deletions cli/solanaetl/cli/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# MIT License
#
# Copyright (c) 2022 Tan Sek Fook, [email protected]
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging

import click
from blockchainetl_common.streaming.streaming_utils import configure_signals, configure_logging
from solanaetl.enumeration.entity_type import EntityType

from solanaetl.providers.auto import get_provider_from_uri
from solanaetl.streaming.item_exporter_creator import create_item_exporters
from solanaetl.thread_local_proxy import ThreadLocalProxy


@click.command(context_settings=dict(help_option_names=['-h', '--help']))
@click.option('-l', '--last-synced-block-file', default='last_synced_block.txt', show_default=True, type=str, help='')
@click.option('--lag', default=0, show_default=True, type=int, help='The number of blocks to lag behind the network.')
@click.option('-p', '--provider-uri', default='https://api.mainnet-beta.solana.com', show_default=True, type=str,
help='The URI of the web3 provider e.g. '
'https://api.mainnet-beta.solana.com')
@click.option('-o', '--output', type=str,
help='Either Google PubSub topic path e.g. projects/your-project/topics/crypto_ethereum; '
'or GCS bucket e.g. gs://your-bucket-name; '
'If not specified will print to console')
@click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block')
@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_STREAMING), show_default=True, type=str,
help='The list of entity types to export.')
@click.option('--period-seconds', default=10, show_default=True, type=int, help='How many seconds to sleep between syncs')
@click.option('-b', '--batch-size', default=10, show_default=True, type=int, help='How many blocks to batch in single request')
@click.option('-B', '--block-batch-size', default=1, show_default=True, type=int, help='How many blocks to batch in single sync round')
@click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers')
@click.option('--log-file', default=None, show_default=True, type=str, help='Log file')
@click.option('--pid-file', default=None, show_default=True, type=str, help='pid file')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
"""Streams all data types to console or Google Pub/Sub."""
configure_logging(log_file)
configure_signals()
entity_types = parse_entity_types(entity_types)

from solanaetl.streaming.solana_streamer_adapter import SolanaStreamerAdapter
from blockchainetl_common.streaming.streamer import Streamer

logging.info('Using ' + provider_uri)

streamer_adapter = SolanaStreamerAdapter(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
item_exporter=create_item_exporters(output),
batch_size=batch_size,
max_workers=max_workers,
entity_types=entity_types
)
streamer = Streamer(
blockchain_streamer_adapter=streamer_adapter,
last_synced_block_file=last_synced_block_file,
lag=lag,
start_block=start_block,
period_seconds=period_seconds,
block_batch_size=block_batch_size,
pid_file=pid_file
)
streamer.stream()


def parse_entity_types(entity_types):
entity_types = [c.strip() for c in entity_types.split(',')]

# validate passed types
for entity_type in entity_types:
if entity_type not in EntityType.ALL_FOR_STREAMING:
raise click.BadOptionUsage(
'--entity-type', '{} is not an available entity type. Supply a comma separated list of types from {}'
.format(entity_type, ','.join(EntityType.ALL_FOR_STREAMING)))

return entity_types
Empty file.
10 changes: 10 additions & 0 deletions cli/solanaetl/enumeration/entity_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class EntityType:
ACCOUNT = 'account'
BLOCK = 'block'
INSTRUCTION = 'instruction'
TOKEN_TRANSFER = 'token_transfer'
TOKEN = 'token'
TRANSACTION = 'transaction'


ALL_FOR_STREAMING = [ACCOUNT, BLOCK, INSTRUCTION, TOKEN_TRANSFER, TOKEN, TRANSACTION]
6 changes: 6 additions & 0 deletions cli/solanaetl/json_rpc_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ def generate_get_multiple_accounts_json_rpc(accounts, encoding='jsonParsed'):
request_id=idx
)

def generate_get_latest_block_json_rpc(request_id=1):
return {
'jsonrpc': '2.0',
'method': "getBlockHeight",
'id': request_id,
}

def generate_json_rpc(method, params, request_id=1):
return {
Expand Down
Empty file.
54 changes: 54 additions & 0 deletions cli/solanaetl/streaming/eth_item_id_calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, [email protected]
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import json
import logging


class SolanaItemIdCalculator:

def calculate(self, item):
if item is None or not isinstance(item, dict):
return None

item_type = item.get('type')

if item_type == 'block' and item.get('hash') is not None:
return concat(item_type, item.get('hash'))
elif item_type == 'transaction' and item.get('block_hash') is not None:
return concat(item_type, item.get('block_hash'))
elif item_type == 'account' and item.get('tx_signature') is not None:
return concat(item_type, item.get('tx_signature'))
elif item_type == 'token_transfer' and item.get('tx_signature') is not None:
return concat(item_type, item.get('tx_signature'))
elif item_type == 'instruction' and item.get('tx_signature') is not None:
return concat(item_type, item.get('tx_signature'))
elif item_type == 'token' and item.get('tx_signature'):
return concat(item_type, item.get('tx_signature'))

logging.warning('item_id for item {} is None'.format(json.dumps(item)))

return None


def concat(*elements):
return '_'.join([str(elem) for elem in elements])
91 changes: 91 additions & 0 deletions cli/solanaetl/streaming/item_exporter_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# MIT License
#
# Copyright (c) 2020 Evgeny Medvedev, [email protected]
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from blockchainetl_common.jobs.exporters.console_item_exporter import ConsoleItemExporter
from blockchainetl_common.jobs.exporters.multi_item_exporter import MultiItemExporter


def create_item_exporters(outputs):
split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console']

item_exporters = [create_item_exporter(output) for output in split_outputs]
return MultiItemExporter(item_exporters)


def create_item_exporter(output):
item_exporter_type = determine_item_exporter_type(output)
if item_exporter_type == ItemExporterType.PUBSUB:
from blockchainetl_common.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter
enable_message_ordering = 'sorted' in output or 'ordered' in output
item_exporter = GooglePubSubItemExporter(
item_type_to_topic_mapping={
'block': output + '.blocks',
'transaction': output + '.transactions',
'instruction': output + '.instructions',
'token_transfer': output + '.token_transfers',
'account': output + '.accounts',
'token': output + '.tokens',
},
message_attributes=('item_id'),
batch_max_bytes=1024 * 1024 * 5,
batch_max_latency=2,
batch_max_messages=1000,
enable_message_ordering=enable_message_ordering)
elif item_exporter_type == ItemExporterType.GCS:
from blockchainetl_common.jobs.exporters.gcs_item_exporter import GcsItemExporter
bucket, path = get_bucket_and_path_from_gcs_output(output)
item_exporter = GcsItemExporter(bucket=bucket, path=path)
elif item_exporter_type == ItemExporterType.CONSOLE:
item_exporter = ConsoleItemExporter()
else:
raise ValueError('Unable to determine item exporter type for output ' + output)

return item_exporter


def get_bucket_and_path_from_gcs_output(output):
output = output.replace('gs://', '')
bucket_and_path = output.split('/', 1)
bucket = bucket_and_path[0]
if len(bucket_and_path) > 1:
path = bucket_and_path[1]
else:
path = ''
return bucket, path


def determine_item_exporter_type(output):
if output is not None and output.startswith('projects'):
return ItemExporterType.PUBSUB
elif output is not None and output.startswith('gs://'):
return ItemExporterType.GCS
elif output is None or output == 'console':
return ItemExporterType.CONSOLE
else:
return ItemExporterType.UNKNOWN


class ItemExporterType:
PUBSUB = 'pubsub'
GCS = 'gcs'
CONSOLE = 'console'
UNKNOWN = 'unknown'
Loading