Data Engineering With Dagster – Part Two: Dependencies, DuckDB, and Geo Heatmaps
Table of Contents
You didn’t think we were done did you? Let’s jump right on!
🔄 Asset Dependencies: Upstream? Downstream? Doughstream.
In Dagster, asset dependencies define the order of operations in your pipeline.
You’re not just running functions — you’re declaring a graph of relationships between data artifacts.
But what do we actually mean by “upstream” and “downstream”?
🍪 Let’s Bake Cookies
Imagine these three assets:
flour
cookie_dough
(needs flour)cookies
(needs dough)
The dependency graph looks like this:
flour ─▶ cookie_dough ─▶ cookies
So who’s upstream of whom?
flour
is upstream ofcookie_dough
cookie_dough
is downstream offlour
, and upstream ofcookies
cookies
is the most downstream — it’s the final product
Upstream = what this asset depends on
Downstream = what depends on this asset
You can also say:
cookie_dough
is the child offlour
cookies
is the grandchild (or descendant) offlour
flour
is an ancestor of both
Dagster tracks this automatically and builds the execution flow accordingly.
Why This Matters
If cookie_dough
fails to build, Dagster will not even attempt to materialize cookies
.
This makes debugging safer and your DAGs easier to reason about — no manually wiring task dependencies.
🧠 Mini Excursus: What is DuckDB?
DuckDB is a fast, in-process SQL OLAP database designed for analytical workflows . Think of it as:
- SQLite, but optimized for columnar big data
- Zero server needed — runs in Python, on your machine
- Understands Parquet, CSV, JSON, and more
💡 It’s the perfect fit for local pipelines and quick experimentation with real SQL power.
🧱 Loading Raw Data into DuckDB
Let’s now build a new asset that loads the previously downloaded .parquet
taxi data into our DuckDB instance.
First, we upgrade our imports:
import duckdb
import os
import dagster as dg
from dagster._utils.backoff import backoff
🧠 Mini Excursus: What’s backoff()
Doing?
DuckDB doesn’t handle concurrency well — try opening it in two places at once and things get spicy.
Dagster’s backoff()
utility adds retry logic with smart wait intervals. If the DB is locked, it tries again (with backoff) instead of crashing.
💾 Asset: taxi_trips
@dg.asset(deps=["taxi_trips_file"])
def taxi_trips() -> None:
"""
The raw taxi trips dataset, loaded into a DuckDB database
"""
query = """
create or replace table trips as (
select
VendorID as vendor_id,
PULocationID as pickup_zone_id,
DOLocationID as dropoff_zone_id,
RatecodeID as rate_code_id,
payment_type,
tpep_dropoff_datetime as dropoff_datetime,
tpep_pickup_datetime as pickup_datetime,
trip_distance,
passenger_count,
total_amount
from 'data/raw/taxi_trips_2023-03.parquet'
);
"""
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={"database": os.getenv("DUCKDB_DATABASE")},
max_retries=10,
)
conn.execute(query)
taxi_trips_file
is now the upstream dependency for this asset. Dagster will wait for it to materialize first.
♻️ Reloading Definitions (Again)
In the UI, click Reload Definitions. This is needed when you:
- Add new assets
- Add schedules/sensors
- Rename Dagster objects
🧠 If you installed Dagster with
pip install -e ".[dev]"
, most changes are picked up automatically — except structural ones like new asset definitions.
You’ll now see a new arrow from taxi_trips_file
to taxi_trips
. That’s your dependency graph growing.
🚦 Materializing the Database Asset
Click Materialize again. Dagster executes taxi_trips_file
, then moves on to taxi_trips
.
✅ Verify It Worked
Open a Python terminal and run:
import duckdb
conn = duckdb.connect(database="data/staging/data.duckdb")
conn.execute("select count(*) from trips").fetchall()
You should see a row count — a sanity check that the table exists and was loaded correctly.
Ctrl+C to stop the Python process when you’re done, or you might run into a file lock later.
🧠 Mini SQL Field Trip: Don’t Just Count Rows
Instead of just checking if the data exists, here’s a slightly better practice:
select payment_type, avg(trip_distance), count(*) as trips
from trips
group by payment_type
order by trips desc;
This helps you:
- Spot outliers or broken rows
- Understand the shape of the data
- Build intuition for transformations later
[+] What is this section meant to be, explain better
🗺️ Geo Heatmap Time (YES!)
We’re now joining the taxi trip data with NYC zone metadata — and visualizing hotspots using GeoJSON
, GeoPandas
, and matplotlib
.
@dg.asset(deps=["taxi_trips", "taxi_zones"])
def manhattan_stats() -> None:
query = """
select
zones.zone,
zones.borough,
zones.geometry,
count(1) as num_trips
from trips
left join zones on trips.pickup_zone_id = zones.zone_id
where borough = 'Manhattan' and geometry is not null
group by zone, borough, geometry
"""
conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
trips_by_zone = conn.execute(query).fetch_df()
trips_by_zone["geometry"] = gpd.GeoSeries.from_wkt(trips_by_zone["geometry"])
trips_by_zone = gpd.GeoDataFrame(trips_by_zone)
with open(constants.MANHATTAN_STATS_FILE_PATH, 'w') as output_file:
output_file.write(trips_by_zone.to_json())
We use LEFT JOIN to include zones even with zero trips (important for clean visuals).
🧠 Mini Excursus: What is GeoJSON?
GeoJSON is a format for encoding geographic data structures (points, polygons, etc.) as JSON. It’s widely used in:
- Web maps (Leaflet, Mapbox)
- Python geodata pipelines
- Spatial analysis with
GeoPandas
Dagster doesn’t care what you output — as long as it’s an asset, you can store anything: .parquet
, .json
, .png
, .pkl
, whatever.
🎨 Visual Asset: manhattan_map
@dg.asset(deps=["manhattan_stats"])
def manhattan_map() -> None:
trips_by_zone = gpd.read_file(constants.MANHATTAN_STATS_FILE_PATH)
fig, ax = plt.subplots(figsize=(10, 10))
trips_by_zone.plot(
column="num_trips",
cmap="plasma",
legend=True,
ax=ax,
edgecolor="black"
)
ax.set_title("Number of Trips per Taxi Zone in Manhattan")
ax.set_xlim([-74.05, -73.90])
ax.set_ylim([40.70, 40.82])
plt.savefig(constants.MANHATTAN_MAP_FILE_PATH, format="png", bbox_inches="tight")
plt.close(fig)
This produces a color-coded heatmap of taxi pickups across Manhattan zones.
Bright colors = high activity = taxi hotspots.
🧠 Mini Excursus: Data Visualization Is a First-Class Asset
Let’s pause and unpack something powerful:
🧩 Visualizations aren’t just side-effects. They are assets.
In traditional pipelines, plots often feel like an afterthought — tacked on at the end of a Jupyter notebook, manually regenerated, and exported to some /figures/
folder.
But in Dagster, that mindset doesn’t cut it. Visual outputs are just as derivable, dependable, and repeatable as any other data artifact.
📦 What Makes a “Data Asset”?
Dagster treats anything that can be:
- Generated deterministically from code
- Stored in some kind of persistent format
- Recreated automatically from inputs
…as a data asset.
This includes:
- Parquet files and SQL tables
- Cleaned CSV exports
- Trained ML models
- Dashboards or HTML reports
- 📊 Static plots or rendered maps
🧠 Why Should Plots Be Assets?
- ✅ Traceability: If the input data changes, Dagster knows the plot is now stale.
- ✅ Reproducibility: Same code, same data → same figure. Every time.
- ✅ Auditability: You can see when and why a plot was created.
- ✅ Scheduling: Automate daily or weekly reports without rebuilding your whole app.
- ✅ Backfills: Need to regenerate all historical plots? Easy — just rematerialize.
In short: Treating visualizations as assets gives you clarity and control — no more guessing which PNG goes with which dataset.
🔥 Real-World Use Cases
Use Case | Input Assets | Visualization Asset |
---|---|---|
Weekly sales dashboard | sales_data , regions |
weekly_sales_plot (HTML or PNG) |
Model performance over time | model_runs , metrics_table |
performance_plot , confusion_matrix |
Geospatial taxi heatmap (like ours) | manhattan_stats |
manhattan_map (PNG asset) |
💡 Bonus: Store as Markdown, PDF, HTML?
Dagster doesn’t care what format your asset takes — PNG, HTML, CSV, Markdown, whatever.
You could go wild and build a monthly_summary_report
asset that:
- Queries a database
- Builds plots and tables
- Renders it as a PDF or HTML page
- Sends it via email (even that send step can be an asset!)
Think of Dagster as your data production line — and visualization as the packaging step. Still critical. Still part of the flow.
⚠️ Common Anti-Patterns
These are all signs your plots should be assets:
- “I run this notebook manually once a week.”
- “I save the plot but forget which data version it used.”
- “The plot broke because I updated the schema upstream.”
If you treat them like code outputs, not manual snapshots, you avoid all of this.
🛠 Example: manhattan_map
Asset
In our project, we created this:
@dg.asset(deps=["manhattan_stats"])
def manhattan_map() -> None:
...
plt.savefig(constants.MANHATTAN_MAP_FILE_PATH, format="png")
This is just as important as the database or parquet file before it.
- Input: a GeoDataFrame (
manhattan_stats
) - Output: a
.png
heatmap - Behavior: deterministic, reproducible, automated
That’s the spirit of asset-based pipelines.
✅ TL;DR
In Dagster (and in modern data engineering in general):
If it’s derived from data, and you care about it — it’s an asset.
Plots included.
Whether it’s for an exec deck, internal dashboard, or just your own validation — make your visualizations part of your graph. Your future self (and your stakeholders) will thank you.
Hell yes — this is the kind of practice challenge that actually builds real understanding. Below is a rewritten version of the trips_by_week
section in your blog post that mirrors the Dagster University musterlösung, but in your hacker-student voice, with explanations, excursus blocks, and clean breakdowns for each step. This turns the previous “quick asset” into a full, memory-aware, production-ish slice of pipeline.
🧪 Practice Asset: trips_by_week
– The Real Deal
Let’s redo trips_by_week
, but properly. The new goal:
- Aggregate weekly stats from the full
trips
table - Write the results as a
.csv
to disk - Handle the data as if it were too large to fit in memory (a good practice anyway)
Here’s the new asset:
from datetime import datetime, timedelta
from . import constants
import pandas as pd
import dagster as dg
from dagster._utils.backoff import backoff
import duckdb
import os
@dg.asset(deps=["taxi_trips"])
def trips_by_week() -> None:
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={"database": os.getenv("DUCKDB_DATABASE")},
max_retries=10,
)
current_date = datetime.strptime("2023-03-01", constants.DATE_FORMAT)
end_date = datetime.strptime("2023-04-01", constants.DATE_FORMAT)
result = pd.DataFrame()
while current_date < end_date:
current_date_str = current_date.strftime(constants.DATE_FORMAT)
query = f"""
select
vendor_id, total_amount, trip_distance, passenger_count
from trips
where date_trunc('week', pickup_datetime) = date_trunc('week', '{current_date_str}'::date)
"""
data_for_week = conn.execute(query).fetch_df()
aggregate = data_for_week.agg({
"vendor_id": "count",
"total_amount": "sum",
"trip_distance": "sum",
"passenger_count": "sum"
}).rename({"vendor_id": "num_trips"}).to_frame().T # type: ignore
aggregate["period"] = current_date
result = pd.concat([result, aggregate])
current_date += timedelta(days=7)
# format cleanup
result['num_trips'] = result['num_trips'].astype(int)
result['passenger_count'] = result['passenger_count'].astype(int)
result['total_amount'] = result['total_amount'].round(2)
result['trip_distance'] = result['trip_distance'].round(2)
result = result[["period", "num_trips", "total_amount", "trip_distance", "passenger_count"]]
result = result.sort_values(by="period")
result.to_csv(constants.TRIPS_BY_WEEK_FILE_PATH, index=False)
🔍 What’s Actually Happening
Let’s break this down by intention:
Step | What’s Going On |
---|---|
current_date loop |
Iterates over each week between March 1 and April 1 |
query |
Pulls only rows for that week (using date_trunc ) |
agg() |
Aggregates trip stats for the week |
concat |
Builds up a DataFrame week by week |
to_csv() |
Dumps the final result to a CSV file we can inspect or use later |
🧠 You could do all this in SQL at once — but breaking it down gives us more control, and allows memory-safe iteration. Plus, it’s a good mental model for streaming/partitioned thinking.
🧠 Mini Excursus: Why Not Just GROUP BY
?
You might ask:
“Why not just
GROUP BY date_trunc('week', pickup_datetime)
once and be done with it?”
Fair. But here’s the trick:
- If your table has millions of rows, you don’t want to
.fetch_df()
the whole thing at once - This lets you handle chunked loading, per-week batching
- It’s also great for long-running pipelines with time windows, or future partitioned assets
This pattern makes your code future-proof and scalable.
🧪 Example Output
Your resulting CSV will look like:
period,num_trips,total_amount,trip_distance,passenger_count
2023-03-05,679681,18495110.72,2358944.42,886486
2023-03-12,686461,19151177.45,2664123.87,905296
2023-03-19,640158,17908993.09,2330611.91,838066
Perfect for loading into dashboards, reports, or another visualization asset. 👀
🧠 Mini Excursus: Asset Fanout Patterns
This type of design — aggregate then emit something like .csv
, .json
, .html
, or .pdf
— is perfect for fanout assets.
Input | Output | Pattern Type |
---|---|---|
Full DB table | Weekly summary CSV | Aggregation |
JSON blob | HTML report | Formatting |
GeoDataFrame | PNG heatmap | Visualization |
ML model | Pickle file | Serialization |
You’re not limited to “data-to-data” pipelines — data-to-output is just as valid.
✅ Wrapping Up Part Two
You now know how to:
- Wire assets with explicit dependencies
- Materialize outputs into DuckDB
- Summarize and analyze with SQL + Pandas
- Visualize using GeoJSON + matplotlib
- And finally, build iterative, memory-aware assets
Next up in Part 3:
📁 Understanding code locations and asset groups
🔍 How Dagster manages definitions under the hood
📦 Structuring your project for clarity and modularity
🧰 Exploring resources and how to inject them cleanly
We’re starting to build real pipelines now — not just functions.