Load mysql table with ConnectorX & Arrow
info
The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/connector_x_arrow.
TLDR
In this example, you will learn how to use arrow tables to load data from sql queries. This method creates arrow tables in memory using Connector X and then loads them into destination supporting parquet files without copying data.
Setup: Running this example on your machine
# clone the dlt repository
git clone git@github.com:dlt-hub/dlt.git
# go to example directory
cd ./dlt/docs/examples/connector_x_arrow
# install dlt with duckdb
pip install "dlt[duckdb]"
# run the example script
python .py
Load mysql table with ConnectorX and Arrow
Example script below takes genome data from public mysql instance and then loads it into duckdb. Mind that your destination
must support loading of parquet files as this is the format that dlt
uses to save arrow tables. Connector X allows to
get data from several popular databases and creates in memory Arrow table which dlt
then saves to load package and loads to the destination.
tip
You can yield several tables if your data is large and you need to partition your load.
We'll learn:
- How to get arrow tables from connector X and yield them.
- That merge and incremental loads work with arrow tables.
- How to enable incremental loading for efficient data extraction.
- How to use build in ConnectionString credentials
Loading code
import connectorx as cx
import dlt
from dlt.sources.credentials import ConnectionStringCredentials
def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value
):
yield cx.read_sql(conn_str.to_native_representation(), query, return_type="arrow2", protocol="binary")
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True
)(read_sql_x)(
"mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000"
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
Run the pipeline:
if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
print(pipeline.run(genome))
print(pipeline.last_trace.last_normalize_info)
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental working