Skip to main content
Version: 0.3.25

30+ SQL Databases

Need help deploying these sources, or figuring out how to run them in your data stack?

Join our Slack community or book a call with our support engineer Adrian.

SQL databases are management systems (DBMS) that store data in a structured format, commonly used for efficient and reliable data retrieval.

Our SQL Database verified source loads data to your specified destination using SQLAlchemy.

tip

View the pipeline example here.

Sources and resources that can be loaded using this verified source are:

NameDescription
sql_databaseRetrieves data from an SQL database
sql_tableRetrieves data from an SQL database table

Supported databases

We support all SQLAlchemy dialects, which include, but are not limited to, the following database engines:

  • PostgreSQL
  • MySQL
  • SQLite
  • Oracle
  • Microsoft SQL Server
  • MariaDB
  • IBM DB2 and Informix
  • Google BigQuery
  • Snowflake
  • Redshift
  • Apache Hive and Presto
  • SAP Hana
  • CockroachDB
  • Firebird
  • Teradata Vantage
note

Note that there many unofficial dialects, such as DuckDB.

Setup Guide

Grab credentials

This verified source utilizes SQLAlchemy for database connectivity. Let's take a look at the following public database example:

connection_url = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"

The database above doesn't require a password.

The connection URL can be broken down into:

connection_url = "connection_string = f"{drivername}://{username}:{password}@{host}:{port}/{database}"

drivername: Indicates both the database system and driver used.

  • E.g., "mysql+pymysql" uses MySQL with the pymysql driver. Alternatives might include mysqldb and mysqlclient.

username: Used for database authentication.

  • E.g., "rfamro" as a possible read-only user.

password: The password for the given username.

host: The server's address or domain where the database is hosted.

  • E.g., A public database at "mysql-rfam-public.ebi.ac.uk" hosted by EBI.

port: The port for the database connection.

  • E.g., "4497", in the above connection URL. port: The port for the database connection.

  • E.g., "4497", in the above connection URL.

database: The specific database on the server.

  • E.g., Connecting to the "Rfam" database.

Configure connection

Here, we use the mysql and pymysql dialects to set up an SSL connection to a server, with all information taken from the SQLAlchemy docs.

  1. To enforce SSL on the client without a client certificate you may pass the following DSN:

    sources.sql_database.credentials="mysql+pymysql://root:<pass>@<host>:3306/mysql?ssl_ca="
  2. You can also pass the server's public certificate (potentially bundled with your pipeline) and disable host name checks:

    sources.sql_database.credentials="mysql+pymysql://root:<pass>@<host>:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false"
  3. For servers requiring a client certificate, provide the client's private key (a secret value). In Airflow, this is usually saved as a variable and exported to a file before use. The server certificate is omitted in the example below:

    sources.sql_database.credentials="mysql+pymysql://root:<pass>@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem"

Initialize the verified source

To get started with your data pipeline, follow these steps:

  1. Enter the following command:

    dlt init sql_database duckdb

    It will initialize the pipeline example with an SQL database as the source and DuckDB as the destination.

    tip

    If you'd like to use a different destination, simply replace duckdb with the name of your preferred destination.

  2. After running this command, a new directory will be created with the necessary files and configuration settings to get started.

For more information, read the Walkthrough: Add a verified source.

Add credentials

  1. In the .dlt folder, there's a file called secrets.toml. It's where you store sensitive information securely, like access tokens. Keep this file safe.

    Here's what the secrets.toml looks like:

    [sources.sql_database.credentials]
    drivername = "mysql+pymysql" # driver name for the database
    database = "Rfam" # database name
    username = "rfamro" # username associated with the database
    host = "mysql-rfam-public.ebi.ac.uk" # host address
    port = "4497" # port required for connection
  2. Alternatively, you can also provide credentials in "secrets.toml" as:

    sources.sql_database.credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"
  3. You can also pass credentials in the pipeline script the following way:

    credentials = ConnectionStringCredentials(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"
    )

    See pipeline example for details.

  4. Finally, follow the instructions in Destinations to add credentials for your chosen destination. This will ensure that your data is properly routed.

For more information, read the General Usage: Credentials.

Run the pipeline

  1. Install the necessary dependencies by running the following command:

    pip install -r requirements.txt
  2. Run the verified source by entering:

    python sql_database_pipeline.py
  3. Make sure that everything is loaded as expected with:

    dlt pipeline <pipeline_name> show
    note

    The pipeline_name for the above example is rfam, you may also use any custom name instead.

Sources and resources

dlt works on the principle of sources and resources.

Source sql_database:

This function loads data from an SQL database via SQLAlchemy and auto-creates resources for each table or from a specified list of tables.

@dlt.source
def sql_database(
credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value,
schema: Optional[str] = dlt.config.value,
metadata: Optional[MetaData] = None,
table_names: Optional[List[str]] = dlt.config.value,
) -> Iterable[DltResource]:

credentials: Database details or an 'sqlalchemy.Engine' instance.

schema: Database schema name (default if unspecified).

metadata: Optional SQLAlchemy.MetaData; takes precedence over schema.

table_names: List of tables to load; defaults to all if not provided.

Resource sql_table

This function loads data from specific database tables.

@dlt.common.configuration.with_config(
sections=("sources", "sql_database"), spec=SqlTableResourceConfiguration
)
def sql_table(
credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value,
table: str = dlt.config.value,
schema: Optional[str] = dlt.config.value,
metadata: Optional[MetaData] = None,
incremental: Optional[dlt.sources.incremental[Any]] = None,
) -> DltResource:

credentials: Database info or an Engine instance.

table: Table to load, set in code or default from "config.toml".

schema: Optional name of the table schema.

metadata: Optional SQLAlchemy.MetaData; takes precedence over schema.

incremental: Optional, enables incremental loading.

write_disposition: Can be "merge", "replace", or "append".

Customization

Create your own pipeline

To create your own pipeline, use source and resource methods from this verified source.

  1. Configure the pipeline by specifying the pipeline name, destination, and dataset as follows:

    pipeline = dlt.pipeline(
    pipeline_name="rfam", # Use a custom name if desired
    destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post)
    dataset_name="rfam_data" # Use a custom name if desired
    )
  2. Pass your credentials using any of the methods described above.

  3. To load the entire database, use the sql_database source as:

    source = sql_database()
    info = pipeline.run(source, write_disposition="replace")
    print(info)
  4. If you just need the "family" table, use:

    source = sql_database().with_resources("family")
    #running the pipeline
    info = pipeline.run(source, write_disposition="replace")
    print(info)
  5. To pseudonymize columns and hide personally identifiable information (PII), refer to the documentation. As an example, here's how to pseudonymize the "rfam_acc" column in the "family" table:

    import hashlib

    def pseudonymize_name(doc):
    '''
    Pseudonmyisation is a deterministic type of PII-obscuring
    Its role is to allow identifying users by their hash,
    without revealing the underlying info.
    '''
    # add a constant salt to generate
    salt = 'WI@N57%zZrmk#88c'
    salted_string = doc['rfam_acc'] + salt
    sh = hashlib.sha256()
    sh.update(salted_string.encode())
    hashed_string = sh.digest().hex()
    doc['rfam_acc'] = hashed_string
    return doc

    pipeline = dlt.pipeline(
    # Configure the pipeline
    )
    # using sql_database source to load family table and pseudonymize the column "rfam_acc"
    source = sql_database().with_resources("family")
    # modify this source instance's resource
    source = source.family.add_map(pseudonymize_name)
    # Run the pipeline. For a large db this may take a while
    info = pipeline.run(source, write_disposition="replace")
    print(info)
  6. To exclude columns, such as the "rfam_id" column from the "family" table before loading:

    def remove_columns(doc):
    del doc["rfam_id"]
    return doc

    pipeline = dlt.pipeline(
    # Configure the pipeline
    )
    # using sql_database source to load family table and remove the column "rfam_id"
    source = sql_database().with_resources("family")
    # modify this source instance's resource
    source = source.family.add_map(remove_columns)
    # Run the pipeline. For a large db this may take a while
    info = pipeline.run(source, write_disposition="replace")
    print(info)
  7. To incrementally load the "family" table using the sql_database source method:

    source = sql_database().with_resources("family")
    #using the "updated" field as an incremental field using initial value of January 1, 2022, at midnight
    source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0))
    #running the pipeline
    info = pipeline.run(source, write_disposition="merge")
    print(info)

    In this example, we load data from the family table, using the updated column for incremental loading. In the first run, the process loads all data starting from midnight (00:00:00) on January 1, 2022. Subsequent runs perform incremental loading, guided by the values in the updated field.

  8. To incrementally load the "family" table using the 'sql_table' resource.

    family = sql_table(
    table="family",
    incremental=dlt.sources.incremental(
    "updated", initial_value=pendulum.datetime(2022, 1, 1, 0, 0, 0)
    ),
    )
    # Running the pipeline
    info = pipeline.extract(family, write_disposition="merge")
    print(info)

    This process initially loads all data from the family table starting at midnight on January 1, 2022. For later runs, it uses the updated field for incremental loading as well.

    info
    • For merge write disposition, the source table needs a primary key, which dlt automatically sets up.
    • apply_hints is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use apply_hints multiple times to create pipelines with merged, appendend, or replaced resources.
  9. Remember to keep the pipeline name and destination dataset name consistent. The pipeline name is crucial for retrieving the state from the last run, which is essential for incremental loading. Altering these names could initiate a "full_refresh", interfering with the metadata tracking necessary for incremental loads.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.