Resource
Declare a resource
A resource is a function that yields data. To create a
resource, we add the @dlt.resource
decorator to that function.
Commonly used arguments:
name
The name of the table generated by this resource. Defaults to decorated function name.write_disposition
How should the data be loaded at destination? Currently, supported:append
,replace
andmerge
. Defaults toappend.
Example:
@dlt.resource(name='table_name', write_disposition='replace')
def generate_rows():
for i in range(10):
yield {'id':i, 'example_string':'abc'}
@dlt.sources
def source_name():
return generate_rows
To get the data of a resource, we could do:
for row in generate_rows():
print(row)
for row in source_name().resources.get('table_name'):
print(row)
Typically, resources are declared and grouped with related resources within a source function.
Define schema
dlt
will generate schema for tables associated with resources from the resource's.
You can modify the generation process by using the table and column hints. Resource decorator
accepts following arguments:
table_name
the name of the table, if different from resource name.primary_key
andmerge_key
define name of the columns (compound keys are allowed) that will receive those hints. Used in incremental loading.columns
let's you define one or more columns, including the data types, nullability and other hints. The column definition is aTypedDict
:TTableSchemaColumns
. In example below, we telldlt
that columntags
(containing a list of tags) inuser
table should have typecomplex
which means that it will be loaded as JSON/struct and not as child table.@dlt.resource(name="user", columns={"tags": {"data_type": "complex"}})
def get_users():
...
# the `table_schema` method gets table schema generated by a resource
print(get_users().table_schema())
💡 You can pass dynamic hints which are functions that take the data item as input and return a hint value. This let's you create table and column schemas depending on the data. See example in next section.
💡 You can mark some resource arguments as configuration and credentials values so
dlt
can pass them automatically to your functions.
Define a schema with Pydantic
You can alternatively use a Pydantic model to define the schema. For example:
from pydantic import BaseModel
class Address(BaseModel):
street: str
city: str
postal_code: str
class User(BaseModel):
id: int
name: str
tags: List[str]
email: Optional[str]
address: Address
status: Union[int, str]
@dlt.resource(name="user", columns=User)
def get_users():
...
The data types of the table columns are inferred from the types of the pydantic fields. These use the same type conversions as when the schema is automatically generated from the data.
Things to note:
- Fields with an
Optional
type are marked asnullable
- Fields with a
Union
type are converted to the first (notNone
) type listed in the union. E.g.status: Union[int, str]
results in abigint
column. list
,dict
and nested pydantic model fields will use thecomplex
type which means they'll be stored as a JSON object in the database instead of creating child tables. You can override this by manually calling the pydantic helper withskip_complex_types=True
, see below:
from dlt.common.lib.pydantic import pydantic_to_table_schema_columns
...
@dlt.resource(name="user", columns=pydantic_to_table_schema_columns(User, skip_complex_types=True))
def get_users():
...
This omits any dict
/list
/BaseModel
type fields from the schema, so dlt will fall back on the default
behaviour of creating child tables for these fields.
Dispatch data to many tables
You can load data to many tables from a single resource. The most common case is a stream of events
of different types, each with different data schema. To deal with this, you can use table_name
argument on dlt.resource
. You could pass the table name as a function with the data item as an
argument and the table_name
string as a return value.
For example, a resource that loads GitHub repository events wants to send issue
, pull request
,
and comment
events to separate tables. The type of the event is in the "type" field.
# send item to a table with name item["type"]
@dlt.resource(table_name=lambda event: event['type'])
def repo_events() -> Iterator[TDataItems]:
yield item
# the `table_schema` method gets table schema generated by a resource and takes optional
# data item to evaluate dynamic hints
print(repo_events().table_schema({"type": "WatchEvent", id=...}))
In more advanced cases, you can dispatch data to different tables directly in the code of the resource function:
@dlt.resource
def repo_events() -> Iterator[TDataItems]:
# mark the "item" to be sent to table with name item["type"]
yield dlt.mark.with_table_name(item, item["type"])
Parametrize a resource
You can add arguments to your resource functions like to any other. Below we parametrize our
generate_rows
resource to generate the number of rows we request:
@dlt.resource(name='table_name', write_disposition='replace')
def generate_rows(nr):
for i in range(nr):
yield {'id':i, 'example_string':'abc'}
for row in generate_rows(10):
print(row)
for row in generate_rows(20):
print(row)
You can mark some resource arguments as configuration and credentials values
so dlt
can pass them automatically to your functions.
Process resources with dlt.transformer
You can feed data from a resource into another one. The most common case is when you have an API that returns a list of objects (i.e. users) in one endpoint and user details in another. You can deal with this by declaring a resource that obtains a list of users and another resource that receives items from the list and downloads the profiles.
@dlt.resource(write_disposition="replace")
def users(limit=None):
for u in _get_users(limit):
yield u
# feed data from users as user_item below,
# all transformers must have at least one
# argument that will receive data from the parent resource
@dlt.transformer(data_from=users)
def users_details(user_item):
for detail in _get_details(user_item["user_id"]):
yield detail
# just load the user_details.
# dlt figures out dependencies for you.
pipeline.run(user_details)
In the example above, user_details
will receive data from default instance of users
resource (with limit
set to None
). You can also use
pipe | operator to bind resources dynamically
# you can be more explicit and use a pipe operator.
# with it you can create dynamic pipelines where the dependencies
# are set at run time and resources are parametrized i.e.
# below we want to load only 100 users from `users` endpoint
pipeline.run(users(limit=100) | user_details)
Declare a standalone resource
A standalone resource is defined on a function that is top level in a module (not inner function) that accepts config and secrets values. Additionally
if standalone
flag is specified, the decorated function signature and docstring will be preserved. dlt.resource
will just wrap the
function decorated function and user must call the wrapper to get the actual resource. Below we declare a filesystem
resource that must be called before use.
@dlt.resource(standalone=True)
def filesystem(bucket_url=dlt.config.value):
"""list and yield files in `bucket_url`"""
...
# `filesystem` must be called before it is extracted or used in any other way
pipeline.run(filesystem("s3://my-bucket/reports"), table_name="reports")
Standalone may have dynamic name that depends on the arguments passed to the decorated function. For example::
@dlt.resource(standalone=True, name=lambda args: args["stream_name"])
def kinesis(stream_name: str):
...
kinesis_stream = kinesis("telemetry_stream")
kinesis_stream
resource has a name telemetry_stream
Customize resources
Filter, transform and pivot data
You can attach any number of transformations that are evaluated on item per item basis to your resource. The available transformation types:
- map - transform the data item (
resource.add_map
). - filter - filter the data item (
resource.add_filter
). - yield map - a map that returns iterator (so single row may generate many rows -
resource.add_yield_map
).
Example: We have a resource that loads a list of users from an api endpoint. We want to customize it so:
- We remove users with
user_id == "me"
. - We anonymize user data.
Here's our resource:
import dlt
@dlt.resource(write_disposition="replace")
def users():
...
users = requests.get(...)
...
yield users
Here's our script that defines transformations and loads the data:
from pipedrive import users
def anonymize_user(user_data):
user_data["user_id"] = hash_str(user_data["user_id"])
user_data["user_email"] = hash_str(user_data["user_email"])
return user_data
# add the filter and anonymize function to users resource and enumerate
for user in users().add_filter(lambda user: user["user_id"] != "me").add_map(anonymize_user):
print(user)
Sample from large data
If your resource loads thousands of pages of data from a REST API or millions of rows from a db
table, you may want to just sample a fragment of it in order i.e. to quickly see the dataset with
example data and test your transformations etc. In order to do that, you limit how many items will
be yielded by a resource by calling resource.add_limit
method. In the example below we load just
10 first items from and infinite counter - that would otherwise never end.
r = dlt.resource(itertools.count(), name="infinity").add_limit(10)
assert list(r) == list(range(10))
💡 We are not skipping any items. We are closing the iterator/generator that produces data after limit is reached.
💡 You cannot limit transformers. They should process all the data they receive fully to avoid inconsistencies in generated datasets.
Set table and adjust schema
You can change the schema of a resource, be it standalone or as a part of a source. Look for method
named apply_hints
which takes the same arguments as resource decorator. Obviously you should call
this method before data is extracted from the resource. Example below converts an append
resource
loading the users
table into merge resource
that will keep just one updated record per user_id
. It also adds
"last value" incremental loading on
created_at
column to prevent requesting again the already loaded records:
tables = sql_database()
tables.users.apply_hints(
write_disposition="merge",
primary_key="user_id",
incremental=dlt.sources.incremental("updated_at")
)
pipeline.run(tables)
To just change a name of a table to which resource will load data, do the following:
tables = sql_database()
tables.users.table_name = "other_users"
Duplicate and rename resources
There are cases when you your resources are generic (ie. bucket filesystem) and you want to load several instances of it (ie. files from different folders) to separate tables. In example below we use filesystem
source to load csvs from two different folders into separate tables:
@dlt.resource(standalone=True)
def filesystem(bucket_url):
# list and yield files in bucket_url
...
@dlt.transformer
def csv_reader(file_item):
# load csv, parse and yield rows in file_item
...
# create two extract pipes that list files from the bucket and send to them to the reader.
# by default both pipes will load data to the same table (csv_reader)
reports_pipe = filesystem("s3://my-bucket/reports") | load_csv()
transactions_pipe = filesystem("s3://my-bucket/transactions") | load_csv()
# so we rename resources to load to "reports" and "transactions" tables
pipeline.run(
[reports_pipe.with_name("reports"), transactions_pipe.with_name("transactions")]
)
with_name
method returns a deep copy of the original resource, its data pipe and the data pipes of a parent resources. A renamed clone is fully separated from the original resource (and other clones) when loading:
it maintains a separate resource state and will load to a table
Load resources
You can pass individual resources or list of resources to the dlt.pipeline
object. The resources
loaded outside the source context, will be added to the default schema of the
pipeline.
@dlt.resource(name='table_name', write_disposition='replace')
def generate_rows(nr):
for i in range(nr):
yield {'id':i, 'example_string':'abc'}
pipeline = dlt.pipeline(
pipeline_name="rows_pipeline",
destination="duckdb",
dataset_name="rows_data"
)
# load individual resource
pipeline.run(generate_rows(10))
# load a list of resources
pipeline.run([generate_rows(10), generate_rows(20)])
Do a full refresh
To do a full refresh of an append
or merge
resources you temporarily change the write
disposition to replace. You can use apply_hints
method of a resource or just provide alternative
write disposition when loading:
p.run(merge_source(), write_disposition="replace")