common.destination.reference
DestinationClientConfiguration Objects
@configspec
class DestinationClientConfiguration(BaseConfiguration)
destination_type
which destination to load data to
fingerprint
def fingerprint() -> str
Returns a destination fingerprint which is a hash of selected configuration fields. ie. host in case of connection string
__str__
def __str__() -> str
Return displayable destination location
DestinationClientDwhConfiguration Objects
@configspec
class DestinationClientDwhConfiguration(DestinationClientConfiguration)
Configuration of a destination that supports datasets/schemas
dataset_name
dataset name in the destination to load data to, for schemas that are not default schema, it is used as dataset prefix
default_schema_name
name of default schema to be used to name effective dataset to load data to
replace_strategy
How to handle replace disposition for this destination, can be classic or staging
normalize_dataset_name
def normalize_dataset_name(schema: Schema) -> str
Builds full db dataset (schema) name out of configured dataset name and schema name: {datasetname}{schema.name}. The resulting name is normalized.
If default schema name is None or equals schema.name, the schema suffix is skipped.
DestinationClientStagingConfiguration Objects
@configspec
class DestinationClientStagingConfiguration(DestinationClientDwhConfiguration)
Configuration of a staging destination, able to store files with desired layout
at bucket_url
.
Also supports datasets and can act as standalone destination.
DestinationClientDwhWithStagingConfiguration Objects
@configspec
class DestinationClientDwhWithStagingConfiguration(
DestinationClientDwhConfiguration)
Configuration of a destination that can take data from staging destination
staging_config
configuration of the staging, if present, injected at runtime
LoadJob Objects
class LoadJob()
Represents a job that loads a single file
Each job starts in "running" state and ends in one of terminal states: "retry", "failed" or "completed".
Each job is uniquely identified by a file name. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present.
In "running" state, the loader component periodically gets the state via status()
method. When terminal state is reached, load job is discarded and not called again.
exception
method is called to get error information in "failed" and "retry" states.
The __init__
method is responsible to put the Job in "running" state. It may raise LoadClientTerminalException
and LoadClientTransientException
to
immediately transition job into "failed" or "retry" state respectively.
__init__
def __init__(file_name: str) -> None
File name is also a job id (or job id is deterministically derived) so it must be globally unique
state
@abstractmethod
def state() -> TLoadJobState
Returns current state. Should poll external resource if necessary.
file_name
def file_name() -> str
A name of the job file
job_id
def job_id() -> str
The job id that is derived from the file name and does not changes during job lifecycle
exception
@abstractmethod
def exception() -> str
The exception associated with failed or retry states
NewLoadJob Objects
class NewLoadJob(LoadJob)
Adds a trait that allows to save new job file
new_file_path
@abstractmethod
def new_file_path() -> str
Path to a newly created temporary job file. If empty, no followup job should be created
FollowupJob Objects
class FollowupJob()
Adds a trait that allows to create a followup job
create_followup_jobs
def create_followup_jobs(final_state: TLoadJobState) -> List[NewLoadJob]
Return list of new jobs. final_state
is state to which this job transits
DoNothingJob Objects
class DoNothingJob(LoadJob)
The most lazy class of dlt
DoNothingFollowupJob Objects
class DoNothingFollowupJob(DoNothingJob, FollowupJob)
The second most lazy class of dlt
JobClientBase Objects
class JobClientBase(ABC)
initialize_storage
@abstractmethod
def initialize_storage(truncate_tables: Iterable[str] = None) -> None
Prepares storage to be used ie. creates database schema or file system folder. Truncates requested tables.
is_storage_initialized
@abstractmethod
def is_storage_initialized() -> bool
Returns if storage is ready to be read/written.
drop_storage
@abstractmethod
def drop_storage() -> None
Brings storage back into not initialized state. Typically data in storage is destroyed.
update_stored_schema
def update_stored_schema(
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None) -> Optional[TSchemaTables]
Updates storage to the current schema.
Implementations should not assume that expected_update
is the exact difference between destination state and the self.schema. This is only the case if
destination has single writer and no other processes modify the schema.
Arguments:
only_tables
Sequence[str], optional - Updates only listed tables. Defaults to None.expected_update
TSchemaTables, optional - Update that is expected to be applied to the destination
Returns:
Optional[TSchemaTables]
- Returns an update that was applied at the destination.
start_file_load
@abstractmethod
def start_file_load(table: TTableSchema, file_path: str,
load_id: str) -> LoadJob
Creates and starts a load job for a particular table
with content in file_path
restore_file_load
@abstractmethod
def restore_file_load(file_path: str) -> LoadJob
Finds and restores already started loading job identified by file_path
if destination supports it.
create_table_chain_completed_followup_jobs
def create_table_chain_completed_followup_jobs(
table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]
Creates a list of followup jobs that should be executed after a table chain is completed
complete_load
@abstractmethod
def complete_load(load_id: str) -> None
Marks the load package with load_id
as completed in the destination. Before such commit is done, the data with load_id
is invalid.
WithStateSync Objects
class WithStateSync(ABC)
get_stored_schema
@abstractmethod
def get_stored_schema() -> Optional[StorageSchemaInfo]
Retrieves newest schema from destination storage
get_stored_state
@abstractmethod
def get_stored_state(pipeline_name: str) -> Optional[StateInfo]
Loads compressed state from destination storage
WithStagingDataset Objects
class WithStagingDataset(ABC)
Adds capability to use staging dataset and request it from the loader
with_staging_dataset
@abstractmethod
def with_staging_dataset() -> ContextManager["JobClientBase"]
Executes job client methods on staging dataset
SupportsStagingDestination Objects
class SupportsStagingDestination()
Adds capability to support a staging destination for the load
Destination Objects
class Destination(ABC, Generic[TDestinationConfig, TDestinationClient])
A destination factory that can be partially pre-configured with credentials and other config params.
spec
@property
@abstractmethod
def spec() -> Type[TDestinationConfig]
A spec of destination configuration that also contains destination credentials
capabilities
@abstractmethod
def capabilities() -> DestinationCapabilitiesContext
Destination capabilities ie. supported loader file formats, identifier name lengths, naming conventions, escape function etc.
destination_name
@property
def destination_name() -> str
The destination name will either be explicitly set while creating the destination or will be taken from the type
client_class
@property
@abstractmethod
def client_class() -> Type[TDestinationClient]
A job client class responsible for starting and resuming load jobs
configuration
def configuration(initial_config: TDestinationConfig) -> TDestinationConfig
Get a fully resolved destination config from the initial config
normalize_type
@staticmethod
def normalize_type(destination_type: str) -> str
Normalizes destination type string into a canonical form. Assumes that type names without dots correspond to build in destinations.
from_reference
@staticmethod
def from_reference(
ref: TDestinationReferenceArg,
credentials: Optional[CredentialsConfiguration] = None,
destination_name: Optional[str] = None,
environment: Optional[str] = None,
**kwargs: Any
) -> Optional["Destination[DestinationClientConfiguration, JobClientBase]"]
Instantiate destination from str reference.
The ref can be a destination name or import path pointing to a destination class (e.g. dlt.destinations.postgres
)
client
def client(
schema: Schema,
initial_config: TDestinationConfig = config.value
) -> TDestinationClient
Returns a configured instance of the destination's job client