Site Logo
Niklas Heringer - Cybersecurity Blog

Data Engineering With Dagster – Part Four: Resources, DRY Pipelines, and ETL in Practice

Posted on 7 mins

Dagster Resources Best-Practice Etl Pipelines

🧰 Resources in Dagster: Tools, Not Afterthoughts

Much like baking, making data pipelines also requires resources.

When you bake cookies, you don’t grab a new bowl for every step. You reuse the same tools — mixing spoon, oven, baking tray — across multiple steps.

And in data engineering, the same is true.

Imagine a simple ETL pipeline:

  1. Fetch data from an API
  2. Store it temporarily in S3
  3. Load it into Snowflake
  4. Visualize it in a dashboard

Each one of these tools — the API, the S3 bucket, the warehouse, the BI tool — is a resource.

So how do we stop ourselves from rewriting connection code everywhere?


💡 Best Practice Focus: DRY

Don’t Repeat Yourself.
Repetition breeds bugs, confusion, and maintenance hell.

DRY is a classic software principle. Instead of copying and pasting config or logic across your pipeline, you define it once — then reuse it wherever needed.

🧠 Mini Excursus: 🧼 What Does DRY Really Mean?

It’s not about being lazy. It’s about clarity and resilience.

So in Dagster, instead of writing your S3 or database logic in every asset function, you define them once as resources.


🧪 What Are Resources in Dagster?

In Dagster, a resource is:

They’re your connection layer — anything that touches the outside world.


Let’s bake cookies again — but this time, DRY.

Ingredients: flour, sugar, chocolate chips  
Tools (resources): bowl, spoon, oven  

Instead of passing a new bowl to every step manually, you declare it once and reuse it throughout the baking process. Same with your spoon and oven.

In Dagster:

Resources let you cleanly share those tools across multiple assets.


🔌 Example: A Real API Resource

from dagster import resource

@resource
def requests_session():
    import requests
    session = requests.Session()
    session.headers.update({"User-Agent": "Dagster-Pipeline"})
    return session

Then you inject it like this:

@asset(required_resource_keys={"requests_session"})
def fetch_data(context):
    session = context.resources.requests_session
    response = session.get("https://api.example.com/data")
    ...

Now your logic stays DRY.
Change the session config in one place, and it updates for all asset functions.


🧠 Mini Excursus: ETL vs ELT vs E🤯L?

The acronyms often get thrown around like seasoning in a hacker recipe, so here’s a clear take:

Pipeline Type Stands For Data Order Used When…
ETL Extract → Transform → Load Clean before saving Data needs heavy prep or cleaning first
ELT Extract → Load → Transform Save raw, clean later Storage is cheap, transformation happens downstream
E… Whatever → Whatever Totally custom pipelines Welcome to real-world data engineering 😅

Whether you’re scraping data or moving gigabytes between systems, the steps are often the same: extract, transform, load — just shuffled.

Resources sit at the center of this — they define how each part connects to the world.


🗃️ Setting Up a Database Resource

Previously, if you wanted to use DuckDB in an asset, you had to manually open a connection inside the asset function — including retry logic, config loading, and imports. It looked like this:

conn = backoff(
    fn=duckdb.connect,
    retry_on=(RuntimeError, duckdb.IOException),
    kwargs={"database": os.getenv("DUCKDB_DATABASE")},
    max_retries=10,
)

✅ This works — but it’s messy, repetitive, and violates the DRY principle.

Let’s fix that.


🔧 Defining the DuckDB Resource

Head over to resources.py and define your reusable DuckDB connection like this:

from dagster_duckdb import DuckDBResource

database_resource = DuckDBResource(
    database="data/staging/data.duckdb"
)

Simple, clean, and centralized.


🧠 Mini Excursus: 🌱 Environment Variables & Dagster

Environment variables are the standard way to configure secrets, paths, and environment-specific values in a secure, flexible way.

Let’s break it down:

Feature Why it matters
Security Keep secrets out of code (and Git!)
Portability Change environments without rewriting
Standard practice Works in CI/CD, cloud, Docker, etc.

There are two common ways to access env vars in Python:

The difference matters in Dagster.
If you change a variable and don’t restart the webserver, os.getenv won’t see the update. But EnvVar will.

So, update your resource definition like this:

from dagster_duckdb import DuckDBResource
import dagster as dg

database_resource = DuckDBResource(
    database=dg.EnvVar("DUCKDB_DATABASE")
)

This lets Dagster dynamically fetch the current value every time a job runs. Super handy for pipelines that need to point to different databases across environments.


🧩 Wiring It Up in Definitions

Now that we’ve defined a resource, we need to register it in our Definitions object so Dagster knows about it.

Update your definitions.py:

from dagster_essentials.resources import database_resource

defs = dg.Definitions(
    assets=[*trip_assets, *metric_assets],
    resources={
        "database": database_resource,
    },
)

Notice that "database" is the name you give the resource — and it must match the parameter name in your asset functions.

Then go into the Dagster UI:

🧠 You’ll see Uses: 0 for now — but let’s fix that next.


♻️ Refactoring Asset Code to Use the Resource

Let’s update our asset taxi_trips to use the database resource instead of manual connection code.

Before (manual):

conn = backoff(
    fn=duckdb.connect,
    retry_on=(RuntimeError, duckdb.IOException),
    kwargs={"database": os.getenv("DUCKDB_DATABASE")},
    max_retries=10,
)
conn.execute(query)

After (resource-based):

@dg.asset(deps=["taxi_trips_file"])
def taxi_trips(database: DuckDBResource) -> None:
    with database.get_connection() as conn:
        conn.execute(query)

🔍 Key changes:


🧠 Mini Excursus: Resource Type Hints (They Matter)

Why is database: DuckDBResource important?

Dagster uses type hints to distinguish between:

If you forget the type hint, Dagster won’t inject the resource correctly — and you’ll get a confusing error.

Type hints aren’t just for IDEs anymore. They’re part of Dagster’s dependency magic.


🛠️ Refactor Other Assets

Now that you have a working resource, refactor any other assets that used duckdb.connect() manually.

Example: manhattan_stats in assets/metrics.py

Before:

conn = duckdb.connect(...)
trips_by_zone = conn.execute(query).fetch_df()

After:

def manhattan_stats(database: DuckDBResource) -> None:
    with database.get_connection() as conn:
        trips_by_zone = conn.execute(query).fetch_df()

💡 Any asset that connects to DuckDB can now just ask for database — and Dagster takes care of the rest.


🔍 Inspecting Resource Usage in the UI

Once all your assets are refactored:

  1. Go to Deployment > Code Locations
  2. Hit Reload to pick up the changes
  3. Click into your code location (dagster_essentials)
  4. Open the Resources tab

Here you’ll see:

This is incredibly useful for auditing your project and seeing how tightly-coupled your assets are to external systems.


📘 Lesson Recap

Let’s wrap it up:

And remember:

You don’t need a Dagster integration to use resources —
Any Python object can be added as a resource in your project.

If you want to pass your own custom class, just type hint it and include it in resources={...}. Dagster will take care of the injection.

Check out Dagster’s full list of integrations — you’ll find ready-to-use resources for Snowflake, BigQuery, Spark, Airbyte, and more.


Up next: loads of stuff! We’ll explore how to trigger assets automatically based on time, events, or upstream changes.
Spoiler: It gets really fun.

Until then: keep your tools clean, your pipelines DRY, and your resources smartly shared.