Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: streaming feature #6

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

sfsf9797
Copy link

add streaming feature
image

Copy link

@jensenity jensenity left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried running the streaming code from https://explorer.solana.com/block/138722608, block 138722608 but it couldn't pass 138722609 because that block was skipped. So we need a pass logic when blocks are skipped.

2022-10-04 20:34:01,224 - root [INFO] - Current block 138725091, target block 138722609, last synced block 138722608, blocks to sync 1
2022-10-04 20:34:01,225 - ProgressLogger [INFO] - Started work. Items to process: 1.
2022-10-04 20:34:01,548 - root [WARNING] - result is None in response {'jsonrpc': '2.0', 'error': {'code': -32009, 'message': 'Slot 138722609 was skipped, or missing in long-term storage'}, 'id': 0}.
2022-10-04 20:34:01,549 - root [ERROR] - An exception occurred while syncing block data.
Traceback (most recent call last):
2022-10-04 20:50:03,020 - root [ERROR] - An exception occurred while syncing block data.
Traceback (most recent call last):
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/blockchainetl_common/streaming/streamer.py", line 77, in _do_stream
    synced_blocks = self._sync_cycle()
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/blockchainetl_common/streaming/streamer.py", line 98, in _sync_cycle
    self.blockchain_streamer_adapter.export_all(self.last_synced_block + 1, target_block)
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/solanaetl/streaming/solana_streamer_adapter.py", line 42, in export_all
    blocks, transactions, instructions = self._export_blocks_transactions_instructions(start_block, end_block)
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/solanaetl/streaming/solana_streamer_adapter.py", line 86, in _export_blocks_transactions_instructions
    blocks_and_transactions_job.run()
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/blockchainetl_common/jobs/base_job.py", line 30, in run
    self._end()
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/solanaetl/jobs/export_blocks_job.py", line 119, in _end
    self.batch_work_executor.shutdown()
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/solanaetl/executors/batch_work_executor.py", line 98, in shutdown
    self.executor.shutdown()
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/solanaetl/executors/fail_safe_executor.py", line 34, in shutdown
    self._check_completed_futures()
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/solanaetl/executors/fail_safe_executor.py", line 42, in _check_completed_futures
    future.result()
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/solanaetl/executors/batch_work_executor.py", line 60, in _fail_safe_execute
    work_handler(batch)
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/solanaetl/jobs/export_blocks_job.py", line 91, in _export_batch
    blocks = [self.block_mapper.from_json_dict(
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/solanaetl/jobs/export_blocks_job.py", line 91, in <listcomp>
    blocks = [self.block_mapper.from_json_dict(
  File "/opt/anaconda3/envs/solana-etl/lib/python3.8/site-packages/solanaetl/mappers/block_mapper.py", line 35, in from_json_dict
    block.number = int(json_dict.get('parentSlot')) + 1
AttributeError: 'NoneType' object has no attribute 'get'
2022-10-04 20:50:03,026 - root [INFO] - Nothing to sync. Sleeping for 10 seconds...

def sort_by(arr, fields):
if isinstance(fields, tuple):
fields = tuple(fields)
return sorted(arr, key=lambda item: tuple(item.get(f) for f in fields))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add eof

@sfsf9797 sfsf9797 requested a review from jensenity October 7, 2022 14:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants