Orchestrating dbt Core with Amazon Athena and Dagster: A Comprehensive Guide for Scalable Data Workflows

Temps de lecture : 8 minutes

Data transformation can be challenging, due to issues like raw SQL queries that are hard to share or reuse, and untested queries that can fail pipelines. dbt Core (Data Build Tool) offers a solution by providing a framework to transform raw data into clean, usable datasets. Using SQL, dbt helps data engineers build, test, and document data transformation workflows efficiently, solving many common data transformation problems.

In this article, we will explore how to effectively use dbt for data transformation, then we will dive into the benefits of integrating it with Amazon Athena, and how to orchestrate these processes using Dagster.

Section 1: The Power of dbt Core in Data Transformation

dbt (data build tool) has revolutionized the way data teams manage their transformation workflows. Imagine you’re working with a client table that contains data from various sources. The goal is to clean this data, ensuring it’s accurate and ready for analysis.

dbt core provides a structured way to handle this, transforming raw SQL queries into reusable models, enabling efficient data testing, and managing incremental updates.

Additionally, dbt core offers comprehensive data lineage, allowing you to track the flow of data through various transformations and understand how each piece of data is derived, ensuring transparency and traceability in your workflows.

Using dbt for Data Transformation

Let’s start with a simple transformation. Suppose we need to create a clean view of our client data. With dbt, you can write a SQL model like this:

{{ config(
	materialized='view'
) }}
SELECT
	client_id,
	client_name,
	date
FROM
	{{ ref('client_data') }}
WHERE
	client_id is not null
	and client_name is not null
	and date is not null

In this example, we are creating a view (materialized=’view’) that selects non-null client data from a source table client_data. The SQL here is standard, making it easy for anyone familiar with SQL to understand and modify.

Ensuring Data Quality with Tests

Data quality is crucial. dbt core allows you to define tests to ensure your data meets specific criteria. For instance, we want to ensure that client_id is unique and client_name is not null, using dbt native data tests.

Additionally, we might have a custom requirement that client names should only contain letters and spaces.

Here’s how you can achieve this using dbt macros and tests.

First, define a custom test macro:

{% test name_format(model, column_name) %}
  SELECT *
  FROM {{ model }}
  WHERE NOT REGEXP_LIKE({{ column_name }}, '^[A-Za-z\s]+$')
{% endtest %}

Next, apply this test in the schema.yml file for our clean_client model, we also apply dbt native data tests like unique and not_null to the other columns:

models:
  - name: clean_client
    description: "Cleaned table for client data with technical and functional quality checks"
    columns:
    - name: client_id
      description: "The primary key for this table"
      data_tests:
        - unique
        - not_null
    - name: client_name
      description: "The name of the client"
      data_tests:
        - not_null
        - name_format:
          column_name: client_name
    - name: date
      description: "The date the client joined"
      data_tests:
        - not_null
        - date_format:
          column_name: date

With these tests, dbt will ensure that client_id is unique, client_name is not null and follows the specified format, and date is valid.

Incremental Models

For larger datasets, it’s inefficient to rebuild tables from scratch. Instead, dbt supports incremental models, which update only the new or changed data. Here’s an example of how to set up an incremental model for our client data:

{{ config(
	materialized='incremental',
	table_type='iceberg',
	incremental_strategy='merge',
	unique_key='client_id',
	update_condition='target.client_name != src.client_name AND src.date > target.date',
	format='parquet'
) }}

SELECT
	client_id,
	client_name,
	date
FROM
	{{ ref('client_data') }}

This model only updates rows where the client name has changed and the date is more recent, ensuring efficient and accurate data updates.

In summary, dbt makes data transformation faster and more reusable. By industrializing workflows, leveraging SQL macros, ensuring data quality, and optimizing with incremental loading, it significantly enhances the efficiency and reliability of transformation pipelines.

Section 2: Amazon Athena

Amazon Athena is a serverless, interactive query service that simplifies querying data stored in S3.

With Athena, you can run SQL queries directly on data without needing to set up or manage infrastructure. It integrates seamlessly with AWS services but is still too incomplete to effectively manage lakehouse architectures, which is why a tool like dbt is so useful.

Advantages of Athena

Athena is cost-effective with its pay-per-query pricing model, allowing you to only pay for the queries you run. It offers flexibility by querying data directly from S3, bypassing the need for data ingestion or transformation upfront.

Its scalability and ease of use make it a strong choice for large-scale data analytics.

Integrating Athena with dbt

dbt can execute transformations on Athena, leveraging SQL to manage and transform data efficiently. The dbt-Athena connector can materialize tables in either Iceberg or Hive formats, and it supports incremental loading with these formats, allowing you to efficiently manage and update your data over time.

Additionally, the connector integrates with AWS Lake Formation to handle precise access control. It can apply Lake Formation tags and grants to the table properties, ensuring detailed and secure access management based on your data requirements.

This setup makes Athena and dbt a powerful combination for managing and transforming data in a lakehouse environment.

Section 3: Orchestrating dbt Workflows with Dagster

Introduction to Dagster

Dagster is an open-source data orchestration tool designed to manage and automate complex data workflows.

It offers a powerful asset-oriented approach, where assets represent key components like tables or datasets within a pipeline. This model allows for detailed management and monitoring of each step in the data transformation process.

Dagster also provides comprehensive tools for scheduling, logging, and monitoring, addressing some limitations of dbt Core, which lacks built-in orchestration and scheduling features.

A UI example of a dbt pipeline in Dagster

Dagster Multi-Container Architecture

Dagster’s architecture involves multiple containers, each with a distinct role:

  • Webserver Container: Hosts the Dagster webserver, providing the user interface for managing and monitoring data pipelines.
  • User Code Container: Contains the dbt code and Dagster orchestration logic. It runs a gRPC server that interacts with both the webserver and daemon to execute jobs.
  • Daemon Container: Responsible for scheduling and launching jobs. It triggers new ECS tasks to run jobs in isolation when needed.
dagster architecture

Representation of Dagster architecture

All three containers connect to a Postgres database, which handles run storage, event logs, and scheduling. This separation of concerns allows for flexible management of your Dagster setup. You can update and re-containerize the user code independently, without disrupting the webserver or daemon containers.

This modular approach makes it easier to iterate on dbt models and Dagster orchestration logic while maintaining the stability of your overall system.

Why Dagster for Orchestrating dbt

Dagster enhances dbt’s capabilities by managing data pipelines at the asset level, ensuring each table or dataset is tracked and monitored. It integrates seamlessly with dbt to execute data quality checks and log results during each transformation.

This integration ensures that you can oversee the entire pipeline from data extraction to validation through a unified interface. Dagster’s ability to upload event logs to CloudWatch and its flexibility in configuring ECS task instances make it a robust choice for orchestrating dbt workflows within AWS.

Dagster Architecture and Setup on AWS

To run Dagster on AWS, you’ll need several key resources:

  1. ECS Cluster: Hosts the Dagster services.
  2. ECR Images: Container images for Dagster components (webserver, scheduler, and daemon).
  3. RDS Postgres Database: For Dagster’s application storage.
  4. CloudWatch: For storing and monitoring dbt logs.
  5. Load Balancer: Provides access to the Dagster webserver.
  6. VPC, Security Groups, and Subnets: Ensures secure and efficient communication between containers using private DNS.

The deployment involves three ECS services, each running a single task, with containers pulling images from ECR. The setup includes configuring private DNS for inter-container communication, ensuring the correct security group and subnet settings are in place.

Conclusion

Using dbt core, Amazon Athena, and Dagster together creates a strong and efficient setup for handling data transformations and workflows. dbt core allows you to transform data using SQL and provides features like incremental updates and data testing. Athena is a cost-effective, serverless query engine that implements well lakehouse storage in Amazon S3. Dagster helps you manage and schedule your data tasks, filling in the gaps where dbt-core lacks built-in automation.

However, there are some limitations to be aware of. The dbt-Athena adapter doesn’t support all dbt features and only works with certain types of tables (Iceberg or Hive).

Additionally, while Athena is powerful for querying data stored in S3, it might not be the best choice for every type of workload, especially those requiring real-time processing or low-latency responses.

Dagster, while offering extensive orchestration capabilities, can add complexity to your setup and requires careful management of resources and dependencies.

The proof-of-concept (POC) we discussed shows how these tools can be put together to manage data transformations and workflows effectively. It provides a practical example of using dbt for transformations, Athena for querying, and Dagster for orchestration. This POC demonstrates the potential of this stack but also highlights the areas where further refinement and customization might be necessary.

 I let you check by yourself here : https://github.com/malon64/dbt-athena-poc.git

The next step is to take this POC and expand it into a full ETL solution. This means scaling up the setup, adding CI/CD automation for smooth updates, and refining the process to handle more complex data needs. The goal is to build a complete, automated data system that can grow with your requirements.

Commentaires :

A lire également sur le sujet :