Skip to main content
Version: 0.3.25

Getting Started

Overview

dlt is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets. Below we give you a preview how you can get data from APIs, files, Python objects or pandas dataframes and move it into a local or remote database, data lake or a vector data store.

Let's get started!

Installation

Install dlt using pip:

pip install -U dlt

Command above installs (or upgrades) library core, in example below we use duckdb as a destination so let's add it:

pip install "dlt[duckdb]"
tip

Use clean virtual environment for your experiments! Here are detailed instructions.

Make sure that your dlt version is 0.3.15 or above. Check it in the terminal with dlt --version.

Quick start

Let's load a list of Python objects (dictionaries) into duckdb and inspect the created dataset:

import dlt

data = [
{'id': 1, 'name': 'Alice'},
{'id': 2, 'name': 'Bob'}
]

pipeline = dlt.pipeline(
pipeline_name='quick_start',
destination='duckdb',
dataset_name='mydata'
)
load_info = pipeline.run(data, table_name="users")

print(load_info)

Save this python script with the name quick_start_pipeline.py and run the following command:

python quick_start_pipeline.py

The output should look like:

Pipeline quick_start completed in 0.59 seconds
1 load package(s) were loaded to destination duckdb and into dataset mydata
The duckdb destination used duckdb:////home/user-name/quick_start/quick_start.duckdb location to store data
Load package 1692364844.460054 is LOADED and contains no failed jobs

dlt just created a database schema called mydata (the dataset_name) with a table users in it. Take a look at it using built-in Streamlit app:

dlt pipeline quick_start show

quick_start is the name of the pipeline from the script above. If you do not have Streamlit installed yet do:

pip install streamlit

Now you should see the users table:

Streamlit Explore data Streamlit Explore data. Schema and data for a test pipeline “quick_start”.

tip

dlt works in Jupyter Notebook and Google Colab! See our Quickstart Colab Demo.

Looking for source code of all the snippets? You can find and run them from this repository.

Learn more:

Load your data

Load data from a variety of sources

Use dlt to load practically any data you deal with in your Python scripts into a dataset. The library will create/update tables, infer data types and deal with nested data automatically:

import dlt

from dlt.common import json

with open("./assets/json_file.json", 'rb') as file:
data = json.load(file)

pipeline = dlt.pipeline(
pipeline_name='from_json',
destination='duckdb',
dataset_name='mydata',
)

# NOTE: test data that we load is just a dictionary so we enclose it in a list
# if your JSON contains a list of objects you do not need to do that
load_info = pipeline.run([data], table_name="json_data")

print(load_info)

We import json from dlt namespace. It defaults to orjson(otherwise simplejson). It can also encode date times, dates, dataclasses and few more data types.

Append or replace your data

Run any of the previous examples twice to notice that each time a copy of the data is added to your tables. We call this load mode append. It is very useful when i.e. you have a new folder created daily with json file logs, and you want to ingest them.

Perhaps this is not what you want to do in the examples above. For example, if the CSV file is updated, how we can refresh it in the database? One method is to tell dlt to replace the data in existing tables by using write_disposition:

import dlt

data = [
{'id': 1, 'name': 'Alice'},
{'id': 2, 'name': 'Bob'}
]

pipeline = dlt.pipeline(
pipeline_name='replace_data',
destination='duckdb',
dataset_name='mydata',
)
load_info = pipeline.run(data, table_name="users", write_disposition="replace")

print(load_info)

Run this script twice to see that users table still contains only one copy of your data.

tip

What if you added a new column to your CSV? dlt will migrate your tables! See the replace mode and table schema migration in action in our Colab Demo.

Learn more:

Declare loading behavior

You can define the loading process by decorating Python functions with @dlt.resource.

Load only new data (incremental loading)

We can improve the GitHub API example above and get only issues that were created since last load. Instead of using replace write_disposition and downloading all issues each time the pipeline is run, we do the following:

import dlt
from dlt.sources.helpers import requests

@dlt.resource(table_name="issues", write_disposition="append")
def get_issues(
created_at=dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z")
):
# NOTE: we read only open issues to minimize number of calls to the API. There's a limit of ~50 calls for not authenticated Github users
url = "https://api.github.com/repos/dlt-hub/dlt/issues?per_page=100&sort=created&directions=desc&state=open"

while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()

# stop requesting pages if the last element was already older than initial value
# note: incremental will skip those items anyway, we just do not want to use the api limits
if created_at.start_out_of_range:
break

# get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]


pipeline = dlt.pipeline(
pipeline_name='github_issues_incremental',
destination='duckdb',
dataset_name='github_data_append',
)
load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info

print(row_counts)
print("------")
print(load_info)

We request issues for dlt-hub/dlt repository ordered by created_at field (descending) and yield them page by page in get_issues generator function.

We use the @dlt.resource decorator to declare table name to which data will be loaded and write disposition, which is append.

We also use dlt.sources.incremental to track created_at field present in each issue to filter in the newly created.

Now run the script. It loads all the issues from our repo to duckdb. Run it again, and you can see that no issues got added (if no issues were created in the meantime).

Now you can run this script on a daily schedule and each day you’ll load only issues created after the time of the previous pipeline run.

tip

Between pipeline runs, dlt keeps the state in the same database it loaded data to. Peek into that state, the tables loaded and get other information with:

dlt pipeline -v github_issues_incremental info

Learn more:

Update and deduplicate your data

The script above finds new issues and adds them to the database. It will ignore any updates to existing issue text, emoji reactions etc. To get always fresh content of all the issues you combine incremental load with merge write disposition, like in the script below.

import dlt
from dlt.sources.helpers import requests

@dlt.resource(
table_name="issues",
write_disposition="merge",
primary_key="id",
)
def get_issues(
updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
# NOTE: we read only open issues to minimize number of calls to the API. There's a limit of ~50 calls for not authenticated Github users
url = f"https://api.github.com/repos/dlt-hub/dlt/issues?since={updated_at.last_value}&per_page=100&sort=updated&directions=desc&state=open"

while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()

# get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]

pipeline = dlt.pipeline(
pipeline_name='github_issues_merge',
destination='duckdb',
dataset_name='github_data_merge',
)
load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info

print(row_counts)
print("------")
print(load_info)

Above we add primary_key hint that tells dlt how to identify the issues in the database to find duplicates which content it will merge.

Note that we now track the updated_at field - so we filter in all issues updated since the last pipeline run (which also includes those newly created).

Pay attention how we use since parameter from GitHub API and updated_at.last_value to tell GitHub to return issues updated only after the date we pass. updated_at.last_value holds the last updated_at value from the previous run.

Learn more:

Dispatch stream of events to tables by event type

This is a fun but practical example that reads GitHub events from dlt repository (such as issue or pull request created, comment added etc.). Each event type is sent to a different table in duckdb.

import dlt
from dlt.sources.helpers import requests

@dlt.resource(primary_key="id", table_name=lambda i: i["type"], write_disposition="append")
def repo_events(
last_created_at = dlt.sources.incremental("created_at")
):
url = "https://api.github.com/repos/dlt-hub/dlt/events?per_page=100"

while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()

# stop requesting pages if the last element was already older than initial value
# note: incremental will skip those items anyway, we just do not want to use the api limits
if last_created_at.start_out_of_range:
break

# get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]

pipeline = dlt.pipeline(
pipeline_name='github_events',
destination='duckdb',
dataset_name='github_events_data',
)
load_info = pipeline.run(repo_events)
row_counts = pipeline.last_trace.last_normalize_info

print(row_counts)
print("------")
print(load_info)

Events content never changes so we can use append write disposition and track new events using created_at field.

We name the tables using a function that receives an event data and returns table name: table_name=lambda i: i["type"]

Now run the script:

python github_events_dispatch.py

Peek at created tables:

dlt pipeline -v github_events info
dlt pipeline github_events trace

And preview the data:

dlt pipeline -v github_events show
tip

Some of the events produce tables with really many child tables. You can control the level of table nesting with a decorator.

Another fun Colab Demo - we analyze reactions on duckdb repo!

Learn more:

Transform the data before the load

Below we extract text from PDFs and load it to Weaviate vector store.

import os

import dlt
from dlt.destinations.weaviate import weaviate_adapter
from PyPDF2 import PdfReader


@dlt.resource(selected=False)
def list_files(folder_path: str):
folder_path = os.path.abspath(folder_path)
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
yield {
"file_name": filename,
"file_path": file_path,
"mtime": os.path.getmtime(file_path)
}


@dlt.transformer(primary_key="page_id", write_disposition="merge")
def pdf_to_text(file_item, separate_pages: bool = False):
if not separate_pages:
raise NotImplementedError()
# extract data from PDF page by page
reader = PdfReader(file_item["file_path"])
for page_no in range(len(reader.pages)):
# add page content to file item
page_item = dict(file_item)
page_item["text"] = reader.pages[page_no].extract_text()
page_item["page_id"] = file_item["file_name"] + "_" + str(page_no)
yield page_item

pipeline = dlt.pipeline(
pipeline_name='pdf_to_text',
destination='weaviate'
)

# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
# (3) sends them to pdf_to_text transformer with pipe (|) operator
pdf_pipeline = list_files("assets/invoices").add_filter(
lambda item: item["file_name"].endswith(".pdf")
) | pdf_to_text(separate_pages=True)

# set the name of the destination table to receive pages
# NOTE: Weaviate, dlt's tables are mapped to classes
pdf_pipeline.table_name = "InvoiceText"

# use weaviate_adapter to tell destination to vectorize "text" column
load_info = pipeline.run(
weaviate_adapter(pdf_pipeline, vectorize="text")
)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)

We start with a simple resource that lists files in specified folder. To that we add a filter function that removes all files that are not pdfs.

To parse PDFs we use PyPDF and return each page from a given PDF as separate data item.

Parsing happens in @dlt.transformer which receives data from list_files resource. It splits PDF into pages, extracts text and yields pages separately so each PDF will correspond to many items in Weaviate InvoiceText class. We set the primary key and use merge disposition so if the same PDF comes twice we'll just update the vectors, and not duplicate.

Look how we pipe data from list_files resource (note that resource is deselected so we do not load raw file items to destination) into pdf_to_text using | operator.

Just before load, the weaviate_adapter is used to tell weaviate destination which fields to vectorize.

To run this example you need additional dependencies:

pip install PyPDF2 "dlt[weaviate]"
python pdf_to_weaviate.py

Now it is time to query our documents.

import weaviate

client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())

Above we provide URL to local cluster. We also use contextionary to vectorize data. You may find information on our setup in links below.

tip

Change the destination to duckdb if you do not have access to Weaviate cluster or not able to run it locally.

Learn more:

Next steps

If you want to take full advantage of the dlt library, then we strongly suggest that you build your sources out of existing building blocks:

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.