Skip to content

Commit

Permalink
README: add use case in Date Engineering
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Dec 17, 2023
1 parent e7b3d19 commit 1a43d0a
Showing 1 changed file with 192 additions and 9 deletions.
201 changes: 192 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[![Actions Status](https://github.com/bonnal-enzo/kioss/workflows/test/badge.svg)](https://github.com/bonnal-enzo/kioss/actions) [![Actions Status](https://github.com/bonnal-enzo/kioss/workflows/PyPI/badge.svg)](https://github.com/bonnal-enzo/kioss/actions)

Expressive pythonic library that has been designed to ***ease the development of (reverse) ETL data pipelines***, with features such as *multithreading*, *rate limiting*, *batching*, and *exceptions handling*.
Ease the **development of ETL/EL/ReverseETL** scripts.

## 1. Install

Expand All @@ -22,7 +22,7 @@ from kioss import Pipe
integers: Pipe[int] = Pipe(source=lambda: range(10))
```

Instantiate a `Pipe` by providing a function returning an `Iterable` (the data source).
Instantiate a `Pipe` by providing a function that returns an `Iterable` (the data source).

## 4. Declare operations

Expand All @@ -36,7 +36,7 @@ There are 2 kinds of operations:
```python
odd_squares: Pipe[int] = (
integers
.map(lambda x: x ** 2) # transformation
.map(lambda x: x ** 2, n_threads=2) # transformation
.filter(lambda x: x % 2 == 1) # transformation
.slow(freq=10) # control
)
Expand Down Expand Up @@ -69,14 +69,14 @@ assert odd_squares == [1, 9, 25, 49, 81]

---

# ***Operations guide***
# ***Operations guide***

Let's keep the same example:
```python
integers = Pipe(lambda: range(10))
```

# A. Transformations
# Transformations
![](./img/transform.gif)

## `.map`
Expand Down Expand Up @@ -143,7 +143,7 @@ one_to_thirty_integers: Pipe[int] = one_to_ten_integers.chain(
)
```

# B. Controls
# Controls
![](./img/control.gif)

## `.slow`
Expand Down Expand Up @@ -187,11 +187,194 @@ inverse_floats: Pipe[float] = integers.map(lambda x: 1/x)
safe_inverse_floats: Pipe[float] = inverse_floats.catch(ZeroDivisionError)
```

You can additionnally provide a `when` argument: a function that takes the parent element as input and decides whether or not to catch the exception.
You can additionally provide a `when` argument: a function that takes the parent element as input and decides whether or not to catch the exception.

---

# ***Typical `kioss` use case in Data Engineering***

As a data engineer, you often need to write dozens of python scripts to do ETL (extract the data from some source API, apply some transformation and load it into a data warehouse) or EL (same with minimal transformation) or Reverse ETL (read data from data warehouse and post it into some destination API).

These scripts do not manipulate huge volumes of data because they are scheduled to run periodically (using orchestrators like Airflow/DAGster/Prefect), and only manipulates the data produced or updated during that period. At worst if you are amazon-sized business you may need to process 10 millions payment transactions every 10 minutes.

These scripts tend to be replaced in part by EL tools like Airbyte, but sometimes you still need custom integration logic.

These scripts are typically composed of:
- the definition of a data **source** that may use:
- a client library: e.g. the `stripe` or `google.cloud.bigquery` modules.
- a custom `Iterator` that loops over the pages of a REST API and yields `Dict[str, Any]` json responses.
- ...

- The **transformation** functions, that again may involve to call APIs.

- The function to post into a **destination** that may use:
- a client library
- the `requests` module

- The logic to **batch** some records together: it will often costs less to POST several records at once to an API.

- The logic to **limit the rate** of the calls to APIs to avoid breaking the API quotas (leading to the infamous `HTTP 429 (Too Many Requests)` status codes).

- The logic to make concurrent calls to APIs: `asyncio` can be very performant, but it often turns out that spawning a few **threads** using a `ThreadPoolExecutor` is enough and more flexible.

- The **retry** logic: be gentle with APIs, 2 retries each waiting 5 seconds can definitely help. For this the [`retrying` module](https://github.com/rholder/retrying) is great and let you decorate your transformation and destination functions with a retrying logic.

- The logic to **catch** exceptions of a given type. Most of the time you want to catch and continue the integration until the end: e.g. if you have 1000 records to integrate, and an error at the 236th occurs because the record is malformed, it's often better to have 999 records integrated and 1 exception raised at the end to investigate and retry the malformed record, instead of 236 records integrated and 1 error raised and the other 763 sane records delayed.


`kioss`'s ambition is to help us write these type of scripts in a DRY (Don't Repeat Yourself), flexible, robust and lisible way.

Let's go for an example to get an intuition about what a kioss-based job looks like !

### 1. imports
```python
import datetime
import requests
from kioss import Pipe
from google.cloud import bigquery
from typing import Iterable, Iterator, Dict, Any
```

### 2. source
define your source `Iterable`:

```python
class PokemonCardPageSource(Iterable[List[Dict[str, Any]]]):
def __init__(
start_time: datetime.datetime,
end_time: datetime.datetime,
page_size: int = 100,
):
...
def __iter__() -> Iterator[List[Dict[str, Any]]]:
# yield the pokemon cards from pokemontcg.io that
# have been created between start_time and end_time
page = 1
while True:
response: requests.Response = requests.get(...)
response.raise_for_status()
cards_page: List[Dict[str, Any]] = ...
yield cards_page
if no_more_pages:
break
page += 1
```

### 3. utilities

We will further need a function that raises in case there is errors in the `Dict`s we pass to it:

```python
def raise_for_errors(dct: Dict[str, Any]) -> None:
if errors := dct["errors"]:
raise RuntimeError(f"Errors occurred: {errors}")
```

also let's init a BQ client:
```python
bq_client = bigquery.Client(project)
```

### 4. pipe

Write your integration function.

Tip: Define your pipe between parentheses to be allowed to go to line between each operation.

```python
def integrate_pokemon_cards_into_bigquery(
start_time: datetime.datetime,
end_time: datetime.datetime,
):
(
Pipe(PokemonCardSource(start_time, end_time))
# at this point we have a Pipe[List[Dict[str, Any]]]

# Let's say pokemontcg.io rate limits us to 10 calls per second,
# let's keep a margin and slow our pipe down to 9.
.slow(freq=9)
.observe(what="pokemon cards page")

# let's flatten the card page into individual cards
.flatten()
# at this point we have a Pipe[Dict[str, Any]]

# let's structure our row
.map(lambda card:
{
"name": card["name"],
"set": card["set"]["id"],
"fetched_at": datetime.utcnow().isoformat(),
}
)
.observe(what="transformed pokemon card")

# Let's batch cards by 1000 for performant multi-rows insert.
.batch(size=1000)
# at this point we have a Pipe[List[Dict[str, Any]]]
.observe(what="pokemon card batches")

# Let's post the batches into BQ concurrently using 2 threads.
.map(lambda cards_batch:
bq_client.insert_rows_json(
table="ingestion.pokemon_card",
json_rows=cards_batch,
),
n_threads=2,
)
# at this point we have a Pipe[Sequence[Dict[str, Any]]]

# The insertion in bigquery returns a list of inserts results.
# Let's raise if the insertion got errors.
.flatten()
.observe(what="bigquery insert results")
# at this point we have a Pipe[Dict[str, Any]]
.do(raise_for_errors)

# iterate until no more card in the pipe and finally raises if errors occurred.
.run()
)
```

### 5. orchestrate
You can now wrap this script as a task within your chosen job orchestrator.

Example using **Airflow**:

```python
... # imports from 1.
from typing import Optional
from airflow.decorators import dag, task

@dag(
default_args={
"retries": 1,
"execution_timeout": datetime.timedelta(minutes=5),
},
schedule="@weekly",
start_date=pendulum.datetime(...),
catchup=True,
max_active_runs=1,
)
def weekly_integrate_pokemon_cards_in_bigquery():
@task
def integrate(
data_interval_start: Optional[datetime.datetime] = None,
data_interval_end: Optional[datetime.datetime] = None
):
integrate_pokemon_cards_into_bigquery(
start_time=data_interval_start,
end_time=data_interval_end,
)

integrate()

_ = weekly_integrate_pokemon_cards_in_bigquery()
```

And we are done !

---
---


*If you want more inspiration on how to leverage kioss, feel free to check the `./examples` folder.*
*This library has been designed from the common requirements of dozens of production jobs, don't hesitate to give your experience feedback by opening issues and Pull Requests !*

0 comments on commit 1a43d0a

Please sign in to comment.