
Data Engineering With Dagster – Part Four: Resources, DRY Pipelines, and ETL in Practice
Table of Contents
🧰 Resources in Dagster: Tools, Not Afterthoughts
Much like baking, making data pipelines also requires resources.
When you bake cookies, you don’t grab a new bowl for every step. You reuse the same tools — mixing spoon, oven, baking tray — across multiple steps.
And in data engineering, the same is true.
Imagine a simple ETL pipeline:
- Fetch data from an API
- Store it temporarily in S3
- Load it into Snowflake
- Visualize it in a dashboard
Each one of these tools — the API, the S3 bucket, the warehouse, the BI tool — is a resource.
So how do we stop ourselves from rewriting connection code everywhere?
💡 Best Practice Focus: DRY
Don’t Repeat Yourself.
Repetition breeds bugs, confusion, and maintenance hell.
DRY is a classic software principle. Instead of copying and pasting config or logic across your pipeline, you define it once — then reuse it wherever needed.
🧠 Mini Excursus: 🧼 What Does DRY Really Mean?
It’s not about being lazy. It’s about clarity and resilience.
- 🔁 Repeated logic becomes harder to update correctly
- 💣 Copy-pasted config means you’re one typo away from disaster
- 🧼 Clean code makes debugging and scaling easier
So in Dagster, instead of writing your S3 or database logic in every asset function, you define them once as resources.
🧪 What Are Resources in Dagster?
In Dagster, a resource is:
- A Python object or function
- Defined once
- Injected into asset functions that need it
- Automatically shown in the Dagster UI (usage, configuration, etc.)
They’re your connection layer — anything that touches the outside world.
🍪 Cookie Metaphor Time
Let’s bake cookies again — but this time, DRY.
Ingredients: flour, sugar, chocolate chips
Tools (resources): bowl, spoon, oven
Instead of passing a new bowl
to every step manually, you declare it once and reuse it throughout the baking process. Same with your spoon
and oven
.
In Dagster:
- Your
bowl
could be a sharedrequests.Session()
for API calls - Your
oven
might be aduckdb.Connection()
or Snowflake client - Your
tray
could be a file system path handler
Resources let you cleanly share those tools across multiple assets.
🔌 Example: A Real API Resource
from dagster import resource
@resource
def requests_session():
import requests
session = requests.Session()
session.headers.update({"User-Agent": "Dagster-Pipeline"})
return session
Then you inject it like this:
@asset(required_resource_keys={"requests_session"})
def fetch_data(context):
session = context.resources.requests_session
response = session.get("https://api.example.com/data")
...
Now your logic stays DRY.
Change the session config in one place, and it updates for all asset functions.
🧠 Mini Excursus: ETL vs ELT vs E🤯L?
The acronyms often get thrown around like seasoning in a hacker recipe, so here’s a clear take:
Pipeline Type | Stands For | Data Order | Used When… |
---|---|---|---|
ETL | Extract → Transform → Load | Clean before saving | Data needs heavy prep or cleaning first |
ELT | Extract → Load → Transform | Save raw, clean later | Storage is cheap, transformation happens downstream |
E… | Whatever → Whatever | Totally custom pipelines | Welcome to real-world data engineering 😅 |
Whether you’re scraping data or moving gigabytes between systems, the steps are often the same: extract, transform, load — just shuffled.
Resources sit at the center of this — they define how each part connects to the world.
🗃️ Setting Up a Database Resource
Previously, if you wanted to use DuckDB in an asset, you had to manually open a connection inside the asset function — including retry logic, config loading, and imports. It looked like this:
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={"database": os.getenv("DUCKDB_DATABASE")},
max_retries=10,
)
✅ This works — but it’s messy, repetitive, and violates the DRY principle.
Let’s fix that.
🔧 Defining the DuckDB Resource
Head over to resources.py
and define your reusable DuckDB connection like this:
from dagster_duckdb import DuckDBResource
database_resource = DuckDBResource(
database="data/staging/data.duckdb"
)
Simple, clean, and centralized.
🧠 Mini Excursus: 🌱 Environment Variables & Dagster
Environment variables are the standard way to configure secrets, paths, and environment-specific values in a secure, flexible way.
Let’s break it down:
Feature | Why it matters |
---|---|
Security | Keep secrets out of code (and Git!) |
Portability | Change environments without rewriting |
Standard practice | Works in CI/CD, cloud, Docker, etc. |
There are two common ways to access env vars in Python:
os.getenv("VAR_NAME")
— grabs the variable when the code loadsEnvVar("VAR_NAME")
from Dagster — grabs it fresh for each run
The difference matters in Dagster.
If you change a variable and don’t restart the webserver, os.getenv
won’t see the update. But EnvVar
will.
So, update your resource definition like this:
from dagster_duckdb import DuckDBResource
import dagster as dg
database_resource = DuckDBResource(
database=dg.EnvVar("DUCKDB_DATABASE")
)
This lets Dagster dynamically fetch the current value every time a job runs. Super handy for pipelines that need to point to different databases across environments.
🧩 Wiring It Up in Definitions
Now that we’ve defined a resource, we need to register it in our Definitions
object so Dagster knows about it.
Update your definitions.py
:
from dagster_essentials.resources import database_resource
defs = dg.Definitions(
assets=[*trip_assets, *metric_assets],
resources={
"database": database_resource,
},
)
Notice that "database"
is the name you give the resource — and it must match the parameter name in your asset functions.
Then go into the Dagster UI:
- Click Deployment > Code Locations
- Hit Reload next to
dagster_essentials
- Navigate to the Definitions > Resources tab
- Click on
database
to inspect the config and where it’s used
🧠 You’ll see Uses: 0
for now — but let’s fix that next.
♻️ Refactoring Asset Code to Use the Resource
Let’s update our asset taxi_trips
to use the database
resource instead of manual connection code.
Before (manual):
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={"database": os.getenv("DUCKDB_DATABASE")},
max_retries=10,
)
conn.execute(query)
After (resource-based):
@dg.asset(deps=["taxi_trips_file"])
def taxi_trips(database: DuckDBResource) -> None:
with database.get_connection() as conn:
conn.execute(query)
🔍 Key changes:
- We imported
DuckDBResource
instead ofduckdb
- We added
database: DuckDBResource
to the function signature
→ This tells Dagster that it’s a resource, not another asset - We replaced the
backoff
logic — it’s now handled by the resource
🧠 Mini Excursus: Resource Type Hints (They Matter)
Why is database: DuckDBResource
important?
Dagster uses type hints to distinguish between:
- other assets (which you pass by name)
- and resources (which must be declared like this)
If you forget the type hint, Dagster won’t inject the resource correctly — and you’ll get a confusing error.
Type hints aren’t just for IDEs anymore. They’re part of Dagster’s dependency magic.
🛠️ Refactor Other Assets
Now that you have a working resource, refactor any other assets that used duckdb.connect()
manually.
Example: manhattan_stats
in assets/metrics.py
Before:
conn = duckdb.connect(...)
trips_by_zone = conn.execute(query).fetch_df()
After:
def manhattan_stats(database: DuckDBResource) -> None:
with database.get_connection() as conn:
trips_by_zone = conn.execute(query).fetch_df()
💡 Any asset that connects to DuckDB can now just ask for database
— and Dagster takes care of the rest.
🔍 Inspecting Resource Usage in the UI
Once all your assets are refactored:
- Go to Deployment > Code Locations
- Hit Reload to pick up the changes
- Click into your code location (
dagster_essentials
) - Open the Resources tab
Here you’ll see:
- A list of resources (e.g.
database
) - A “Uses” tab showing which assets depend on each resource
- Config details, environment variable mappings, etc.
This is incredibly useful for auditing your project and seeing how tightly-coupled your assets are to external systems.
📘 Lesson Recap
Let’s wrap it up:
- ✅ Resources let you define tools (like DB clients) once and reuse them cleanly
- ✅ Use EnvVar for dynamic, environment-aware config
- ✅ Register your resources in
Definitions(resources={...})
- ✅ Inject them into assets with type hints like
database: DuckDBResource
- ✅ You can view and analyze resource usage in the Dagster UI
And remember:
You don’t need a Dagster integration to use resources —
Any Python object can be added as a resource in your project.
If you want to pass your own custom class, just type hint it and include it in resources={...}
. Dagster will take care of the injection.
Check out Dagster’s full list of integrations — you’ll find ready-to-use resources for Snowflake, BigQuery, Spark, Airbyte, and more.
Up next: loads of stuff!
We’ll explore how to trigger assets automatically based on time, events, or upstream changes.
Spoiler: It gets really fun.
Until then: keep your tools clean, your pipelines DRY, and your resources smartly shared.