Skip to main content
Version: 0.3.25

Control nested MongoDB data

info
The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/nested_data.

TLDR

This example demonstrates how to control nested data using Python and the dlt library. It covers working with MongoDB, incremental loading, limiting nesting levels, and applying data type hints.

Setup: Running this example on your machine

# clone the dlt repository
git clone git@github.com:dlt-hub/dlt.git
# go to example directory
cd ./dlt/docs/examples/nested_data
# install dlt with duckdb
pip install "dlt[duckdb]"
# run the example script
python nested_data.py

Control nested data

In this example, you'll find a Python script that demonstrates how to control nested data using the dlt library.

We'll learn how to:

Install pymongo

 pip install pymongo>=4.3.3

Loading code

from itertools import islice
from typing import Any, Dict, Iterator, Optional

from bson.decimal128 import Decimal128
from bson.objectid import ObjectId
from pendulum import _datetime
from pymongo import MongoClient

import dlt
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import TDataItem
from dlt.common.utils import map_nested_in_place

CHUNK_SIZE = 10000

# You can limit how deep dlt goes when generating child tables.
# By default, the library will descend and generate child tables
# for all nested lists, without a limit.
# In this example, we specify that we only want to generate child tables up to level 2,
# so there will be only one level of child tables within child tables.
@dlt.source(max_table_nesting=2)
def mongodb_collection(
connection_url: str = dlt.secrets.value,
database: Optional[str] = dlt.config.value,
collection: str = dlt.config.value,
incremental: Optional[dlt.sources.incremental] = None, # type: ignore[type-arg]
write_disposition: Optional[str] = dlt.config.value,
) -> Any:
# set up mongo client
client: Any = MongoClient(connection_url, uuidRepresentation="standard", tz_aware=True)
mongo_database = client.get_default_database() if not database else client[database]
collection_obj = mongo_database[collection]

def collection_documents(
client: Any,
collection: Any,
incremental: Optional[dlt.sources.incremental[Any]] = None,
) -> Iterator[TDataItem]:
LoaderClass = CollectionLoader

loader = LoaderClass(client, collection, incremental=incremental)
yield from loader.load_documents()

return dlt.resource( # type: ignore
collection_documents,
name=collection_obj.name,
primary_key="_id",
write_disposition=write_disposition,
)(client, collection_obj, incremental=incremental)

Run the pipeline

if __name__ == "__main__":
# When we created the source, we set max_table_nesting to 2.
# This ensures that the generated tables do not have more than two
# levels of nesting, even if the original data structure is more deeply nested.
pipeline = dlt.pipeline(
pipeline_name="mongodb_pipeline",
destination="duckdb",
dataset_name="unpacked_data",
)
source_data = mongodb_collection(
collection="movies", write_disposition="replace"
)
load_info = pipeline.run(source_data)
print(load_info)

# The second method involves setting the max_table_nesting attribute directly
# on the source data object.
# This allows for dynamic control over the maximum nesting
# level for a specific data source.
# Here the nesting level is adjusted before running the pipeline.
pipeline = dlt.pipeline(
pipeline_name="mongodb_pipeline",
destination="duckdb",
dataset_name="not_unpacked_data",
)
source_data = mongodb_collection(
collection="movies", write_disposition="replace"
)
source_data.max_table_nesting = 0
load_info = pipeline.run(source_data)
print(load_info)

# The third method involves applying data type hints to specific columns in the data.
# In this case, we tell dlt that column 'cast' (containing a list of actors)
# in 'movies' table should have type complex which means
# that it will be loaded as JSON/struct and not as child table.
pipeline = dlt.pipeline(
pipeline_name="mongodb_pipeline",
destination="duckdb",
dataset_name="unpacked_data_without_cast",
)
source_data = mongodb_collection(
collection="movies", write_disposition="replace"
)
source_data.movies.apply_hints(columns={"cast": {"data_type": "complex"}})
load_info = pipeline.run(source_data)
print(load_info)

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.