Skip to main content
Version: Next

State

The pipeline state is a Python dictionary which lives alongside your data; you can store values in it and, on next pipeline run, request them back.

Read and write pipeline state in a resourceโ€‹

You read and write the state in your resources. Below we use the state to create a list of chess game archives which we then use to prevent requesting duplicates.

@dlt.resource(write_disposition="append")
def players_games(chess_url, player, start_month=None, end_month=None):
# create or request a list of archives from resource scoped state
checked_archives = dlt.current.resource_state().setdefault("archives", [])
# get list of archives for a particular player
archives = player_archives(chess_url, player)
for url in archives:
if url in checked_archives:
print(f"skipping archive {url}")
continue
else:
print(f"getting archive {url}")
checked_archives.append(url)
# get the filtered archive
r = requests.get(url)
r.raise_for_status()
yield r.json().get("games", [])

Above, we request the resource-scoped state. The checked_archives list stored under archives dictionary key is private and visible only to the players_games resource.

The pipeline state is stored locally in pipeline working directory and as a consequence - it cannot be shared with pipelines with different names. You must also make sure that data written into the state is JSON Serializable. Except standard Python types, dlt handles DateTime, Decimal, bytes and UUID.

Share state across resources and read state in a sourceโ€‹

You can also access the source-scoped state with dlt.current.source_state() which can be shared across resources of a particular source and is also available read-only in the source-decorated functions. The most common use case for the source-scoped state is to store mapping of custom fields to their displayable names. You can take a look at our pipedrive source for an example of state passed across resources.

tip

decompose your source in order to, for example run it on Airflow in parallel. If you cannot avoid that, designate one of the resources as state writer and all the other as state readers. This is exactly what pipedrive pipeline does. With such structure you will still be able to run some of your resources in parallel.

caution

The dlt.state() is a deprecated alias to dlt.current.source_state() and will be soon removed.

Syncing state with destinationโ€‹

What if you run your pipeline on, for example, Airflow where every task gets a clean filesystem and pipeline working directory is always deleted? dlt loads your state into the destination together with all other data and when faced with a clean start, it will try to restore state from the destination.

The remote state is identified by pipeline name, the destination location (as given by the credentials) and destination dataset. To re-use the same state, use the same pipeline name and destination.

The state is stored in the _dlt_pipeline_state table at the destination and contains information about the pipeline, pipeline run (that the state belongs to) and state blob.

dlt has dlt pipeline sync command where you can request the state back from that table.

๐Ÿ’ก If you can keep the pipeline working directory across the runs, you can disable the state sync by setting restore_from_destination=false i.e. in your config.toml.

When to use pipeline stateโ€‹

Do not use pipeline state if it can grow to millions of recordsโ€‹

Do not use dlt state when it may grow to millions of elements. Do you plan to store modification timestamps of all of your millions of user records? This is probably a bad idea! In that case you could:

  • Store the state in dynamo-db, redis etc. taking into the account that if the extract stage fails you'll end with invalid state.
  • Use your loaded data as the state. dlt exposes the current pipeline via dlt.current.pipeline() from which you can obtain sqlclient and load the data of interest. In that case try at least to process your user records in batches.

Access data in the destination instead of pipeline stateโ€‹

In the example below, we load recent comments made by given user_id. We access user_comments table to select maximum comment id for a given user.

import dlt

@dlt.resource(name="user_comments")
def comments(user_id: str):
current_pipeline = dlt.current.pipeline()
# find last comment id for given user_id by looking in destination
max_id: int = 0
# on first pipeline run, user_comments table does not yet exist so do not check at all
# alternatively catch DatabaseUndefinedRelation which is raised when unknown table is selected
if not current_pipeline.first_run:
with current_pipeline.sql_client() as client:
# we may get last user comment or None which we replace with 0
max_id = (
client.execute_sql(
"SELECT MAX(_id) FROM user_comments WHERE user_id=?", user_id
)[0][0]
or 0
)
# use max_id to filter our results (we simulate API query)
yield from [
{"_id": i, "value": letter, "user_id": user_id}
for i, letter in zip([1, 2, 3], ["A", "B", "C"])
if i > max_id
]

When pipeline is first run, the destination dataset and user_comments table do not yet exist. We skip the destination query by using first_run property of the pipeline. We also handle a situation where there are no comments for a user_id by replacing None with 0 as max_id.

Inspect the pipeline stateโ€‹

You can inspect pipeline state with dlt pipeline command:

dlt pipeline -v chess_pipeline info

will display source and resource state slots for all known sources.

Reset the pipeline state: full or partialโ€‹

To fully reset the state:

To partially reset the state:

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub โ€“ it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.