Skip to main content
Version: 0.3.25

Pokemon details in parallel using transformers

info
The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/transformer.

TLDR

In this example, you will learn how to load a list of Pokemon from the PokeAPI and with the help of dlt transformers automatically query additional data per retrieved Pokemon. You will also learn how to harness parallelism with a thread pool.

Setup: Running this example on your machine

# clone the dlt repository
git clone git@github.com:dlt-hub/dlt.git
# go to example directory
cd ./dlt/docs/examples/transformer
# install dlt with duckdb
pip install "dlt[duckdb]"
# run the example script
python pokemon.py

Using transformers with the Pokemon API

For this example, we will be loading Pokemon data from the PokeAPI with the help of transformers to load Pokemon details in parallel.

We'll learn how to:

  • create 2 transformers and connect them to a resource with the pipe operator |;
  • load these transformers in parallel using the @dlt.defer decorator;
  • configure parallelism in the config.toml file;
  • deselect the main resource, so it will not be loaded into the database;
  • importing and using a pre-configured requests library with automatic retries (from dlt.sources.helpers import requests).

Loading code

import dlt
from dlt.sources.helpers import requests


@dlt.source(max_table_nesting=2)
def source(pokemon_api_url: str):
""""""

# note that we deselect `pokemon_list` - we do not want it to be loaded
@dlt.resource(write_disposition="replace", selected=False)
def pokemon_list():
"""Retrieve a first page of Pokemons and yield it. We do not retrieve all the pages in this example"""
yield requests.get(pokemon_api_url).json()["results"]

# transformer that retrieves a list of objects in parallel
@dlt.transformer
def pokemon(pokemons):
"""Yields details for a list of `pokemons`"""

# @dlt.defer marks a function to be executed in parallel
# in a thread pool
@dlt.defer
def _get_pokemon(_pokemon):
return requests.get(_pokemon["url"]).json()

# call and yield the function result normally, the @dlt.defer takes care of parallelism
for _pokemon in pokemons:
yield _get_pokemon(_pokemon)

# a special case where just one item is retrieved in transformer
# a whole transformer may be marked for parallel execution
@dlt.transformer
@dlt.defer
def species(pokemon_details):
"""Yields species details for a pokemon"""
species_data = requests.get(pokemon_details["species"]["url"]).json()
# link back to pokemon so we have a relation in loaded data
species_data["pokemon_id"] = pokemon_details["id"]
# just return the results, if you yield,
# generator will be evaluated in main thread
return species_data

# create two simple pipelines with | operator
# 1. send list of pokemons into `pokemon` transformer to get pokemon details
# 2. send pokemon details into `species` transformer to get species details
# NOTE: dlt is smart enough to get data from pokemon_list and pokemon details once

return (
pokemon_list | pokemon,
pokemon_list | pokemon | species
)

if __name__ == "__main__":
# build duck db pipeline
pipeline = dlt.pipeline(
pipeline_name="pokemon", destination="duckdb", dataset_name="pokemon_data"
)

# the pokemon_list resource does not need to be loaded
load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon"))
print(load_info)

config.toml with examples how to configure parallelism

[runtime]
log_level="WARNING"

[extract]
# use 2 workers to extract sources in parallel
worker=2
# allow 10 async items to be processed in parallel
max_parallel_items=10

[normalize]
# use 3 worker processes to process 3 files in parallel
workers=3

[load]
# have 50 concurrent load jobs
workers=50

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.