Skip to content

Commit

Permalink
Merge pull request #153 from blockchain-etl/refactor-parse-dags
Browse files Browse the repository at this point in the history
Refactor Parse Dags
  • Loading branch information
charlielewisme authored Mar 21, 2023
2 parents a5801dc + 638bf8b commit d44a8e0
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 88 deletions.
11 changes: 6 additions & 5 deletions airflow/dags/polygon_parse_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@

var_prefix = 'polygon_'

parse_dag_vars = read_parse_dag_vars(
var_prefix=var_prefix,
parse_schedule_interval='30 8 * * *'
)

for folder in glob(table_definitions_folder):
dataset = folder.split('/')[-1]

Expand All @@ -25,9 +30,5 @@
dag_id=dag_id,
dataset_folder=folder,
source_dataset_name="crypto_polygon",
**read_parse_dag_vars(
var_prefix=var_prefix,
dataset=dataset,
parse_schedule_interval='30 8 * * *'
)
**parse_dag_vars
)
94 changes: 21 additions & 73 deletions airflow/dags/polygonetl_airflow/build_parse_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
import os
from datetime import datetime, timedelta
from glob import glob

from airflow import models
from airflow.operators.bash import BashOperator
Expand All @@ -13,8 +12,9 @@
from google.cloud import bigquery

from polygonetl_airflow.bigquery_utils import create_view, share_dataset_all_users_read
from polygonetl_airflow.common import read_json_file, read_file
from polygonetl_airflow.parse.parse_logic import ref_regex, parse, create_dataset
from polygonetl_airflow.common import read_json_file, read_file, get_list_of_files
from polygonetl_airflow.parse.parse_dataset_folder_logic import parse_dataset_folder
from polygonetl_airflow.parse.parse_table_definition_logic import create_dataset

from utils.error_handling import handle_dag_failure

Expand All @@ -39,9 +39,6 @@ def build_parse_dag(

logging.info('parse_all_partitions is {}'.format(parse_all_partitions))

if parse_all_partitions:
dag_id = dag_id + '_FULL'

PARTITION_DAG_ID = 'polygon_partition_dag'

default_dag_args = {
Expand All @@ -63,34 +60,15 @@ def build_parse_dag(
schedule_interval=parse_schedule_interval,
default_args=default_dag_args)

validation_error = None
try:
validate_definition_files(dataset_folder)
except ValueError as e:
validation_error = e

# This prevents failing all dags as they are constructed in a loop in ethereum_parse_dag.py
if validation_error is not None:
def raise_validation_error(ds, **kwargs):
raise validation_error

validation_error_operator = PythonOperator(
task_id='validation_error',
python_callable=raise_validation_error,
execution_timeout=timedelta(minutes=10),
dag=dag
)

return dag

def create_parse_task(table_definition):
def create_parse_task():

def parse_task(ds, **kwargs):
validate_definition_files(dataset_folder)
client = bigquery.Client()

parse(
parse_dataset_folder(
bigquery_client=client,
table_definition=table_definition,
dataset_folder=dataset_folder,
ds=ds,
source_project_id=source_project_id,
source_dataset_name=source_dataset_name,
Expand All @@ -100,20 +78,15 @@ def parse_task(ds, **kwargs):
parse_all_partitions=parse_all_partitions
)

table_name = table_definition['table']['table_name']
dataset_name = get_dataset_name(dataset_folder)
parsing_operator = PythonOperator(
task_id=table_name,
task_id=f'parse_tables_{dataset_name}',
python_callable=parse_task,
execution_timeout=timedelta(minutes=60),
execution_timeout=timedelta(minutes=60 * 4),
dag=dag
)

contract_address = table_definition['parser']['contract_address']
if contract_address is not None:
ref_dependencies = ref_regex.findall(table_definition['parser']['contract_address'])
else:
ref_dependencies = []
return parsing_operator, ref_dependencies
return parsing_operator

def create_add_view_task(dataset_name, view_name, sql):
def create_view_task(ds, **kwargs):
Expand Down Expand Up @@ -156,7 +129,7 @@ def share_dataset_task(**_):
dag=dag,
)

wait_for_ethereum_load_dag_task = ExternalTaskSensor(
wait_for_polygon_partition_dag_task = ExternalTaskSensor(
task_id='wait_for_polygon_partition_dag',
external_dag_id=PARTITION_DAG_ID,
external_task_id='done',
Expand All @@ -168,36 +141,16 @@ def share_dataset_task(**_):
timeout=60 * 60 * 30,
dag=dag)

json_files = get_list_of_files(dataset_folder, '*.json')
logging.info(json_files)

all_parse_tasks = {}
task_dependencies = {}
for json_file in json_files:
table_definition = read_json_file(json_file)
task, dependencies = create_parse_task(table_definition)
wait_for_ethereum_load_dag_task >> task
all_parse_tasks[task.task_id] = task
task_dependencies[task.task_id] = dependencies

parse_task = create_parse_task()
wait_for_polygon_partition_dag_task >> parse_task

checkpoint_task = BashOperator(
task_id='parse_all_checkpoint',
bash_command='echo parse_all_checkpoint',
priority_weight=1000,
dag=dag
)

for task, dependencies in task_dependencies.items():
for dependency in dependencies:
if dependency not in all_parse_tasks:
raise ValueError(
'Table {} is not found in the the dataset. Check your ref() in contract_address field.'.format(
dependency))
all_parse_tasks[dependency] >> all_parse_tasks[task]

all_parse_tasks[task] >> checkpoint_task

final_tasks = [checkpoint_task]
parse_task >> checkpoint_task

sql_files = get_list_of_files(dataset_folder, '*.sql')
logging.info(sql_files)
Expand All @@ -211,25 +164,16 @@ def share_dataset_task(**_):
view_name = os.path.splitext(base_name)[0]
create_view_task = create_add_view_task(full_dataset_name, view_name, sql)
checkpoint_task >> create_view_task
final_tasks.append(create_view_task)

share_dataset_task = create_share_dataset_task(full_dataset_name)
checkpoint_task >> share_dataset_task
final_tasks.append(share_dataset_task)

return dag


def get_list_of_files(dataset_folder, filter='*.json'):
logging.info('get_list_of_files')
logging.info(dataset_folder)
logging.info(os.path.join(dataset_folder, filter))
return [f for f in glob(os.path.join(dataset_folder, filter))]


def validate_definition_files(dataset_folder):
json_files = get_list_of_files(dataset_folder, '*.json')
dataset_folder_name = dataset_folder.split('/')[-1]
dataset_folder_name = get_dataset_name(dataset_folder)

all_lowercase_table_names = []
for json_file in json_files:
Expand Down Expand Up @@ -261,3 +205,7 @@ def validate_definition_files(dataset_folder):

if len(non_unique_table_names) > 0:
raise ValueError(f'The following table names are not unique {",".join(non_unique_table_names)}')


def get_dataset_name(dataset_folder):
return dataset_folder.split('/')[-1]
10 changes: 10 additions & 0 deletions airflow/dags/polygonetl_airflow/common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import json
import logging
import os
from glob import glob


def read_json_file(filepath):
Expand All @@ -11,3 +14,10 @@ def read_file(filepath):
with open(filepath) as file_handle:
content = file_handle.read()
return content


def get_list_of_files(dataset_folder, filter='*.json'):
logging.info('get_list_of_files')
logging.info(dataset_folder)
logging.info(os.path.join(dataset_folder, filter))
return [f for f in glob(os.path.join(dataset_folder, filter))]
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import logging
import time

from polygonetl_airflow.common import get_list_of_files, read_json_file
from polygonetl_airflow.parse.parse_table_definition_logic import parse, ref_regex
from polygonetl_airflow.parse.toposort import toposort_flatten


def parse_dataset_folder(
bigquery_client,
dataset_folder,
ds,
source_project_id,
source_dataset_name,
destination_project_id,
internal_project_id,
sqls_folder,
parse_all_partitions,
time_func=time.time
):
logging.info(f'Parsing dataset folder {dataset_folder}')
json_files = get_list_of_files(dataset_folder, '*.json')
logging.info(json_files)

topologically_sorted_json_files = topologically_sort_json_files(json_files)
logging.info(f'Topologically sorted json files: {topologically_sorted_json_files}')

for index, json_file in enumerate(topologically_sorted_json_files):
logging.info(f'Parsing json file {index} out of {len(topologically_sorted_json_files)}: {json_file}')
table_definition = read_json_file(json_file)
parse(
bigquery_client,
table_definition,
ds,
source_project_id,
source_dataset_name,
destination_project_id,
internal_project_id,
sqls_folder,
parse_all_partitions,
time_func=time_func
)


def topologically_sort_json_files(json_files):
table_name_to_file_map = {}
dependencies = {}

for json_file in json_files:
table_definition = read_json_file(json_file)

contract_address = table_definition['parser']['contract_address']

ref_dependencies = ref_regex.findall(contract_address) if contract_address is not None else None

table_name = get_table_name_from_json_file_name(json_file)

dependencies[table_name] = set(ref_dependencies) if ref_dependencies is not None else set()
table_name_to_file_map[table_name] = json_file

validate_dependencies(dependencies, table_name_to_file_map.keys())
logging.info(f'Table definition dependencies: {dependencies}')

# TODO: Use toposort() instead of toposort_flatten() so that independent tables could be run in parallel
sorted_tables = list(toposort_flatten(dependencies))

topologically_sorted_json_files = [table_name_to_file_map[table_name] for table_name in sorted_tables]
return topologically_sorted_json_files


def validate_dependencies(dependencies, table_names):
for deps in dependencies.values():
for dep_table_name in deps:
if dep_table_name not in table_names:
raise ValueError(f'Dependency {dep_table_name} not found. Check ref() in table definitions')


def get_table_name_from_json_file_name(json_file_name):
return json_file_name.split('/')[-1].replace('.json', '')
96 changes: 96 additions & 0 deletions airflow/dags/polygonetl_airflow/parse/toposort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#######################################################################
# Implements a topological sort algorithm.
#
# Copyright 2014-2021 True Blade Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Notes:
# Based on http://code.activestate.com/recipes/578272-topological-sort
# with these major changes:
# Added unittests.
# Deleted doctests (maybe not the best idea in the world, but it cleans
# up the docstring).
# Moved functools import to the top of the file.
# Changed assert to a ValueError.
# Changed iter[items|keys] to [items|keys], for python 3
# compatibility. I don't think it matters for python 2 these are
# now lists instead of iterables.
# Copy the input so as to leave it unmodified.
# Renamed function from toposort2 to toposort.
# Handle empty input.
# Switch tests to use set literals.
#
########################################################################

from functools import reduce as _reduce

__all__ = ["toposort", "toposort_flatten", "CircularDependencyError"]
__version__ = "1.7"


class CircularDependencyError(ValueError):
def __init__(self, data):
# Sort the data just to make the output consistent, for use in
# error messages. That's convenient for doctests.
s = "Circular dependencies exist among these items: {{{}}}".format(
", ".join(
"{!r}:{!r}".format(key, value) for key, value in sorted(data.items())
)
)
super(CircularDependencyError, self).__init__(s)
self.data = data


def toposort(data):
"""\
Dependencies are expressed as a dictionary whose keys are items
and whose values are a set of dependent items. Output is a list of
sets in topological order. The first set consists of items with no
dependences, each subsequent set consists of items that depend upon
items in the preceeding sets."""

# Special case empty input.
if len(data) == 0:
return

# Copy the input so as to leave it unmodified.
# Discard self-dependencies and copy two levels deep.
data = {item: set(e for e in dep if e != item) for item, dep in data.items()}

# Find all items that don't depend on anything.
extra_items_in_deps = _reduce(set.union, data.values()) - set(data.keys())
# Add empty dependences where needed.
data.update({item: set() for item in extra_items_in_deps})
while True:
ordered = set(item for item, dep in data.items() if len(dep) == 0)
if not ordered:
break
yield ordered
data = {
item: (dep - ordered) for item, dep in data.items() if item not in ordered
}
if len(data) != 0:
raise CircularDependencyError(data)


def toposort_flatten(data, sort=True):
"""\
Returns a single list of dependencies. For any set returned by
toposort(), those items are sorted and appended to the result (just to
make the results deterministic)."""

result = []
for d in toposort(data):
result.extend((sorted if sort else list)(d))
return result
Loading

0 comments on commit d44a8e0

Please sign in to comment.