Skip to main content
Version: 0.3.25

Load mysql table with ConnectorX & Arrow

info
The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/connector_x_arrow.

TLDR

In this example, you will learn how to use arrow tables to load data from sql queries. This method creates arrow tables in memory using Connector X and then loads them into destination supporting parquet files without copying data.

Setup: Running this example on your machine

# clone the dlt repository
git clone git@github.com:dlt-hub/dlt.git
# go to example directory
cd ./dlt/docs/examples/connector_x_arrow
# install dlt with duckdb
pip install "dlt[duckdb]"
# run the example script
python .py

Load mysql table with ConnectorX and Arrow

Example script below takes genome data from public mysql instance and then loads it into duckdb. Mind that your destination must support loading of parquet files as this is the format that dlt uses to save arrow tables. Connector X allows to get data from several popular databases and creates in memory Arrow table which dlt then saves to load package and loads to the destination.

tip

You can yield several tables if your data is large and you need to partition your load.

We'll learn:

  • How to get arrow tables from connector X and yield them.
  • That merge and incremental loads work with arrow tables.
  • How to enable incremental loading for efficient data extraction.
  • How to use build in ConnectionString credentials

Loading code

import connectorx as cx

import dlt
from dlt.sources.credentials import ConnectionStringCredentials

def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value
):
yield cx.read_sql(conn_str.to_native_representation(), query, return_type="arrow2", protocol="binary")

# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True
)(read_sql_x)(
"mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000"
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))

Run the pipeline:

if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
print(pipeline.run(genome))
print(pipeline.last_trace.last_normalize_info)
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental working

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.