Data Engineering With Dagster - Part Two: Dependencies, DuckDB, and Geo Heatmaps

Learning to wire assets together, load data into DuckDB, and build heatmaps with real NYC taxi data.
Data Engineering With Dagster - Part Two: Dependencies, DuckDB, and Geo Heatmaps

You didn't think we were done did you? Let's jump right on!

Asset Dependencies: Upstream? Downstream? Doughstream.

In Dagster, asset dependencies define the order of operations in your pipeline.
You’re not just running functions - you’re declaring a graph of relationships between data artifacts.

But what do we actually mean by "upstream" and "downstream"?

🍪 Let’s Bake Cookies

Imagine these three assets:

  • flour
  • cookie_dough (needs flour)
  • cookies (needs dough)

The dependency graph looks like this:

flour ─▶ cookie_dough ─▶ cookies

So who's upstream of whom?

  • flour is upstream of cookie_dough
  • cookie_dough is downstream of flour, and upstream of cookies
  • cookies is the most downstream - it's the final product
Upstream = what this asset depends on
Downstream = what depends on this asset

You can also say:

  • cookie_dough is the child of flour
  • cookies is the grandchild (or descendant) of flour
  • flour is an ancestor of both

Dagster tracks this automatically and builds the execution flow accordingly.

Why This Matters

If cookie_dough fails to build, Dagster will not even attempt to materialize cookies.
This makes debugging safer and your DAGs easier to reason about - no manually wiring task dependencies.


Mini Excursus: What is DuckDB?

DuckDB is a fast, in-process SQL OLAP database designed for analytical workflows. Think of it as:

  • SQLite, but optimized for columnar big data
  • Zero server needed - runs in Python, on your machine
  • Understands Parquet, CSV, JSON, and more
💡
It’s the perfect fit for local pipelines and quick experimentation with real SQL power.

Loading Raw Data into DuckDB

Let’s now build a new asset that loads the previously downloaded .parquet taxi data into our DuckDB instance.

First, we upgrade our imports:

import duckdb
import os
import dagster as dg
from dagster._utils.backoff import backoff

Mini Excursus: What’s backoff() Doing?

DuckDB doesn’t handle concurrency well - try opening it in two places at once and things get spicy.

Dagster’s backoff() utility adds retry logic with smart wait intervals. If the DB is locked, it tries again (with backoff) instead of crashing.


Asset: taxi_trips

@dg.asset(deps=["taxi_trips_file"])
def taxi_trips() -> None:
    """
    The raw taxi trips dataset, loaded into a DuckDB database
    """
    query = """
        create or replace table trips as (
          select
            VendorID as vendor_id,
            PULocationID as pickup_zone_id,
            DOLocationID as dropoff_zone_id,
            RatecodeID as rate_code_id,
            payment_type,
            tpep_dropoff_datetime as dropoff_datetime,
            tpep_pickup_datetime as pickup_datetime,
            trip_distance,
            passenger_count,
            total_amount
          from 'data/raw/taxi_trips_2023-03.parquet'
        );
    """

    conn = backoff(
        fn=duckdb.connect,
        retry_on=(RuntimeError, duckdb.IOException),
        kwargs={"database": os.getenv("DUCKDB_DATABASE")},
        max_retries=10,
    )
    conn.execute(query)
taxi_trips_file is now the upstream dependency for this asset. Dagster will wait for it to materialize first.

Reloading Definitions (Again)

In the UI, click Reload Definitions. This is needed when you:

  • Add new assets
  • Add schedules/sensors
  • Rename Dagster objects
If you installed Dagster with pip install -e ".[dev]", most changes are picked up automatically - except structural ones like new asset definitions.

You’ll now see a new arrow from taxi_trips_file to taxi_trips. That’s your dependency graph growing.


Materializing the Database Asset

Click Materialize again. Dagster executes taxi_trips_file, then moves on to taxi_trips.

Verify It Worked

Open a Python terminal and run:

import duckdb
conn = duckdb.connect(database="data/staging/data.duckdb")
conn.execute("select count(*) from trips").fetchall()

You should see a row count - a sanity check that the table exists and was loaded correctly.

Ctrl+C to stop the Python process when you're done, or you might run into a file lock later.

Mini SQL Field Trip: Don't Just Count Rows

Instead of just checking if the data exists, here’s a slightly better practice:

select payment_type, avg(trip_distance), count(*) as trips
from trips
group by payment_type
order by trips desc;

This helps you:

  • Spot outliers or broken rows
  • Understand the shape of the data
  • Build intuition for transformations later

[+] What is this section meant to be, explain better


Geo Heatmap Time (YES!)

We're now joining the taxi trip data with NYC zone metadata - and visualizing hotspots using GeoJSONGeoPandas, and matplotlib.

@dg.asset(deps=["taxi_trips", "taxi_zones"])
def manhattan_stats() -> None:
    query = """
        select
            zones.zone,
            zones.borough,
            zones.geometry,
            count(1) as num_trips
        from trips
        left join zones on trips.pickup_zone_id = zones.zone_id 
        where borough = 'Manhattan' and geometry is not null
        group by zone, borough, geometry
    """
    conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
    trips_by_zone = conn.execute(query).fetch_df()

    trips_by_zone["geometry"] = gpd.GeoSeries.from_wkt(trips_by_zone["geometry"])
    trips_by_zone = gpd.GeoDataFrame(trips_by_zone)

    with open(constants.MANHATTAN_STATS_FILE_PATH, 'w') as output_file:
        output_file.write(trips_by_zone.to_json())
We use LEFT JOIN to include zones even with zero trips (important for clean visuals).

Mini Excursus: What is GeoJSON?

GeoJSON is a format for encoding geographic data structures (points, polygons, etc.) as JSON. It’s widely used in:

  • Web maps (Leaflet, Mapbox)
  • Python geodata pipelines
  • Spatial analysis with GeoPandas

Dagster doesn’t care what you output - as long as it’s an asset, you can store anything: .parquet.json.png.pkl, whatever.

Visual Asset: manhattan_map

@dg.asset(deps=["manhattan_stats"])
def manhattan_map() -> None:
    trips_by_zone = gpd.read_file(constants.MANHATTAN_STATS_FILE_PATH)

    fig, ax = plt.subplots(figsize=(10, 10))
    trips_by_zone.plot(
        column="num_trips",
        cmap="plasma",
        legend=True,
        ax=ax,
        edgecolor="black"
    )
    ax.set_title("Number of Trips per Taxi Zone in Manhattan")
    ax.set_xlim([-74.05, -73.90])
    ax.set_ylim([40.70, 40.82])

    plt.savefig(constants.MANHATTAN_MAP_FILE_PATH, format="png", bbox_inches="tight")
    plt.close(fig)

This produces a color-coded heatmap of taxi pickups across Manhattan zones.
Bright colors = high activity = taxi hotspots.


Mini Excursus: Data Visualization Is a First-Class Asset

Let’s pause and unpack something powerful:

🧩 Visualizations aren't just side-effects. They are assets.

In traditional pipelines, plots often feel like an afterthought - tacked on at the end of a Jupyter notebook, manually regenerated, and exported to some /figures/ folder.

But in Dagster, that mindset doesn’t cut it. Visual outputs are just as derivabledependable, and repeatable as any other data artifact.

What Makes a "Data Asset"?

Dagster treats anything that can be:

  • Generated deterministically from code
  • Stored in some kind of persistent format
  • Recreated automatically from inputs

…as a data asset.

This includes:

  • Parquet files and SQL tables
  • Cleaned CSV exports
  • Trained ML models
  • Dashboards or HTML reports
  • 📊 Static plots or rendered maps

Why Should Plots Be Assets?

  • Traceability: If the input data changes, Dagster knows the plot is now stale.
  • Reproducibility: Same code, same data → same figure. Every time.
  • Auditability: You can see when and why a plot was created.
  • Scheduling: Automate daily or weekly reports without rebuilding your whole app.
  • Backfills: Need to regenerate all historical plots? Easy - just rematerialize.
In short: Treating visualizations as assets gives you clarity and control - no more guessing which PNG goes with which dataset.

Real-World Use Cases

Use CaseInput AssetsVisualization Asset
Weekly sales dashboardsales_data, regionsweekly_sales_plot (HTML or PNG)
Model performance over timemodel_runs, metrics_tableperformance_plot, confusion_matrix
Geospatial taxi heatmap (like ours)manhattan_statsmanhattan_map (PNG asset)

Store as Markdown, PDF, HTML?

Dagster doesn’t care what format your asset takes - PNG, HTML, CSV, Markdown, whatever.

You could go wild and build a monthly_summary_report asset that:

  • Queries a database
  • Builds plots and tables
  • Renders it as a PDF or HTML page
  • Sends it via email (even that send step can be an asset!)
Think of Dagster as your data production line - and visualization as the packaging step. Still critical. Still part of the flow.

Common Anti-Patterns

These are all signs your plots should be assets:

  • “I run this notebook manually once a week.”
  • “I save the plot but forget which data version it used.”
  • “The plot broke because I updated the schema upstream.”

If you treat them like code outputs, not manual snapshots, you avoid all of this.

Example: manhattan_map Asset

In our project, we created this:

@dg.asset(deps=["manhattan_stats"])
def manhattan_map() -> None:
    ...
    plt.savefig(constants.MANHATTAN_MAP_FILE_PATH, format="png")

This is just as important as the database or parquet file before it.

  • Input: a GeoDataFrame (manhattan_stats)
  • Output: a .png heatmap
  • Behavior: deterministic, reproducible, automated

That’s the spirit of asset-based pipelines.


TL;DR

In Dagster (and in modern data engineering in general):

If it’s derived from data, and you care about it - it’s an asset.

Plots included.

Whether it’s for an exec deck, internal dashboard, or just your own validation - make your visualizations part of your graph. Your future self (and your stakeholders) will thank you.


Hell yes - this is the kind of practice challenge that actually builds real understanding. Below is a rewritten version of the trips_by_week section in your blog post that mirrors the Dagster University musterlösung, but in your hacker-student voice, with explanations, excursus blocks, and clean breakdowns for each step. This turns the previous "quick asset" into a full, memory-aware, production-ish slice of pipeline.


Practice Asset: trips_by_week - The Real Deal

Let’s redo trips_by_week, but properly. The new goal:

  • Aggregate weekly stats from the full trips table
  • Write the results as a .csv to disk
  • Handle the data as if it were too large to fit in memory (a good practice anyway)

Here's the new asset:

from datetime import datetime, timedelta
from . import constants

import pandas as pd
import dagster as dg
from dagster._utils.backoff import backoff
import duckdb
import os


@dg.asset(deps=["taxi_trips"])
def trips_by_week() -> None:
    conn = backoff(
        fn=duckdb.connect,
        retry_on=(RuntimeError, duckdb.IOException),
        kwargs={"database": os.getenv("DUCKDB_DATABASE")},
        max_retries=10,
    )

    current_date = datetime.strptime("2023-03-01", constants.DATE_FORMAT)
    end_date = datetime.strptime("2023-04-01", constants.DATE_FORMAT)

    result = pd.DataFrame()

    while current_date < end_date:
        current_date_str = current_date.strftime(constants.DATE_FORMAT)
        query = f"""
            select
                vendor_id, total_amount, trip_distance, passenger_count
            from trips
            where date_trunc('week', pickup_datetime) = date_trunc('week', '{current_date_str}'::date)
        """

        data_for_week = conn.execute(query).fetch_df()

        aggregate = data_for_week.agg({
            "vendor_id": "count",
            "total_amount": "sum",
            "trip_distance": "sum",
            "passenger_count": "sum"
        }).rename({"vendor_id": "num_trips"}).to_frame().T  # type: ignore

        aggregate["period"] = current_date
        result = pd.concat([result, aggregate])

        current_date += timedelta(days=7)

    # format cleanup
    result['num_trips'] = result['num_trips'].astype(int)
    result['passenger_count'] = result['passenger_count'].astype(int)
    result['total_amount'] = result['total_amount'].round(2)
    result['trip_distance'] = result['trip_distance'].round(2)
    result = result[["period", "num_trips", "total_amount", "trip_distance", "passenger_count"]]
    result = result.sort_values(by="period")

    result.to_csv(constants.TRIPS_BY_WEEK_FILE_PATH, index=False)

What’s Actually Happening

Let’s break this down by intention:

StepWhat’s Going On
current_date loopIterates over each week between March 1 and April 1
queryPulls only rows for that week (using date_trunc)
agg()Aggregates trip stats for the week
concatBuilds up a DataFrame week by week
to_csv()Dumps the final result to a CSV file we can inspect or use later
You could do all this in SQL at once - but breaking it down gives us more control, and allows memory-safe iteration. Plus, it’s a good mental model for streaming/partitioned thinking.

Mini Excursus: Why Not Just GROUP BY?

You might ask:

“Why not just GROUP BY date_trunc('week', pickup_datetime) once and be done with it?”

Fair. But here’s the trick:

  • If your table has millions of rows, you don’t want to .fetch_df() the whole thing at once
  • This lets you handle chunked loading, per-week batching
  • It’s also great for long-running pipelines with time windows, or future partitioned assets

This pattern makes your code future-proof and scalable.


Example Output

Your resulting CSV will look like:

period,num_trips,total_amount,trip_distance,passenger_count
2023-03-05,679681,18495110.72,2358944.42,886486
2023-03-12,686461,19151177.45,2664123.87,905296
2023-03-19,640158,17908993.09,2330611.91,838066

Perfect for loading into dashboards, reports, or another visualization asset.

Mini Excursus: Asset Fanout Patterns

This type of design - aggregate then emit something like .csv.json.html, or .pdf - is perfect for fanout assets.

InputOutputPattern Type
Full DB tableWeekly summary CSVAggregation
JSON blobHTML reportFormatting
GeoDataFramePNG heatmapVisualization
ML modelPickle fileSerialization

You're not limited to "data-to-data" pipelines - data-to-output is just as valid.


Wrapping Up Part Two

You now know how to:

  • Wire assets with explicit dependencies
  • Materialize outputs into DuckDB
  • Summarize and analyze with SQL + Pandas
  • Visualize using GeoJSON + matplotlib
  • And finally, build iterative, memory-aware assets

Next up in Part 3:

  • Understanding code locations and asset groups
  • How Dagster manages definitions under the hood
  • Structuring your project for clarity and modularity
  • Exploring resources and how to inject them cleanly
We’re starting to build real pipelines now - not just functions.

Resources

Subscribe to my monthly newsletter

No spam, no sharing to third party. Only you and me.

Member discussion