Niklas Heringer - Cybersecurity Blog

Data Engineering With Dagster – Part Two: Dependencies, DuckDB, and Geo Heatmaps

You didn’t think we were done did you? Let’s jump right on!


🔄 Asset Dependencies: Upstream? Downstream? Doughstream.

In Dagster, asset dependencies define the order of operations in your pipeline.
You’re not just running functions — you’re declaring a graph of relationships between data artifacts.

But what do we actually mean by “upstream” and “downstream”?

🍪 Let’s Bake Cookies

Imagine these three assets:

The dependency graph looks like this:

flour ─▶ cookie_dough ─▶ cookies

So who’s upstream of whom?

Upstream = what this asset depends on
Downstream = what depends on this asset

You can also say:

Dagster tracks this automatically and builds the execution flow accordingly.

Why This Matters

If cookie_dough fails to build, Dagster will not even attempt to materialize cookies.
This makes debugging safer and your DAGs easier to reason about — no manually wiring task dependencies.


🧠 Mini Excursus: What is DuckDB?

DuckDB is a fast, in-process SQL OLAP database designed for analytical workflows . Think of it as:

💡 It’s the perfect fit for local pipelines and quick experimentation with real SQL power.


🧱 Loading Raw Data into DuckDB

Let’s now build a new asset that loads the previously downloaded .parquet taxi data into our DuckDB instance.

First, we upgrade our imports:

import duckdb
import os
import dagster as dg
from dagster._utils.backoff import backoff

🧠 Mini Excursus: What’s backoff() Doing?

DuckDB doesn’t handle concurrency well — try opening it in two places at once and things get spicy.

Dagster’s backoff() utility adds retry logic with smart wait intervals. If the DB is locked, it tries again (with backoff) instead of crashing.


💾 Asset: taxi_trips

@dg.asset(deps=["taxi_trips_file"])
def taxi_trips() -> None:
    """
    The raw taxi trips dataset, loaded into a DuckDB database
    """
    query = """
        create or replace table trips as (
          select
            VendorID as vendor_id,
            PULocationID as pickup_zone_id,
            DOLocationID as dropoff_zone_id,
            RatecodeID as rate_code_id,
            payment_type,
            tpep_dropoff_datetime as dropoff_datetime,
            tpep_pickup_datetime as pickup_datetime,
            trip_distance,
            passenger_count,
            total_amount
          from 'data/raw/taxi_trips_2023-03.parquet'
        );
    """

    conn = backoff(
        fn=duckdb.connect,
        retry_on=(RuntimeError, duckdb.IOException),
        kwargs={"database": os.getenv("DUCKDB_DATABASE")},
        max_retries=10,
    )
    conn.execute(query)

taxi_trips_file is now the upstream dependency for this asset. Dagster will wait for it to materialize first.


♻️ Reloading Definitions (Again)

In the UI, click Reload Definitions. This is needed when you:

🧠 If you installed Dagster with pip install -e ".[dev]", most changes are picked up automatically — except structural ones like new asset definitions.

You’ll now see a new arrow from taxi_trips_file to taxi_trips. That’s your dependency graph growing.


🚦 Materializing the Database Asset

Click Materialize again. Dagster executes taxi_trips_file, then moves on to taxi_trips.

✅ Verify It Worked

Open a Python terminal and run:

import duckdb
conn = duckdb.connect(database="data/staging/data.duckdb")
conn.execute("select count(*) from trips").fetchall()

You should see a row count — a sanity check that the table exists and was loaded correctly.

Ctrl+C to stop the Python process when you’re done, or you might run into a file lock later.


🧠 Mini SQL Field Trip: Don’t Just Count Rows

Instead of just checking if the data exists, here’s a slightly better practice:

select payment_type, avg(trip_distance), count(*) as trips
from trips
group by payment_type
order by trips desc;

This helps you:

[+] What is this section meant to be, explain better


🗺️ Geo Heatmap Time (YES!)

We’re now joining the taxi trip data with NYC zone metadata — and visualizing hotspots using GeoJSON, GeoPandas, and matplotlib.

@dg.asset(deps=["taxi_trips", "taxi_zones"])
def manhattan_stats() -> None:
    query = """
        select
            zones.zone,
            zones.borough,
            zones.geometry,
            count(1) as num_trips
        from trips
        left join zones on trips.pickup_zone_id = zones.zone_id 
        where borough = 'Manhattan' and geometry is not null
        group by zone, borough, geometry
    """
    conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
    trips_by_zone = conn.execute(query).fetch_df()

    trips_by_zone["geometry"] = gpd.GeoSeries.from_wkt(trips_by_zone["geometry"])
    trips_by_zone = gpd.GeoDataFrame(trips_by_zone)

    with open(constants.MANHATTAN_STATS_FILE_PATH, 'w') as output_file:
        output_file.write(trips_by_zone.to_json())

We use LEFT JOIN to include zones even with zero trips (important for clean visuals).


🧠 Mini Excursus: What is GeoJSON?

GeoJSON is a format for encoding geographic data structures (points, polygons, etc.) as JSON. It’s widely used in:

Dagster doesn’t care what you output — as long as it’s an asset, you can store anything: .parquet, .json, .png, .pkl, whatever.


🎨 Visual Asset: manhattan_map

@dg.asset(deps=["manhattan_stats"])
def manhattan_map() -> None:
    trips_by_zone = gpd.read_file(constants.MANHATTAN_STATS_FILE_PATH)

    fig, ax = plt.subplots(figsize=(10, 10))
    trips_by_zone.plot(
        column="num_trips",
        cmap="plasma",
        legend=True,
        ax=ax,
        edgecolor="black"
    )
    ax.set_title("Number of Trips per Taxi Zone in Manhattan")
    ax.set_xlim([-74.05, -73.90])
    ax.set_ylim([40.70, 40.82])

    plt.savefig(constants.MANHATTAN_MAP_FILE_PATH, format="png", bbox_inches="tight")
    plt.close(fig)

This produces a color-coded heatmap of taxi pickups across Manhattan zones.
Bright colors = high activity = taxi hotspots.


🧠 Mini Excursus: Data Visualization Is a First-Class Asset

Let’s pause and unpack something powerful:

🧩 Visualizations aren’t just side-effects. They are assets.

In traditional pipelines, plots often feel like an afterthought — tacked on at the end of a Jupyter notebook, manually regenerated, and exported to some /figures/ folder.

But in Dagster, that mindset doesn’t cut it. Visual outputs are just as derivable, dependable, and repeatable as any other data artifact.

📦 What Makes a “Data Asset”?

Dagster treats anything that can be:

…as a data asset.

This includes:

🧠 Why Should Plots Be Assets?

In short: Treating visualizations as assets gives you clarity and control — no more guessing which PNG goes with which dataset.


🔥 Real-World Use Cases

Use Case Input Assets Visualization Asset
Weekly sales dashboard sales_data, regions weekly_sales_plot (HTML or PNG)
Model performance over time model_runs, metrics_table performance_plot, confusion_matrix
Geospatial taxi heatmap (like ours) manhattan_stats manhattan_map (PNG asset)

💡 Bonus: Store as Markdown, PDF, HTML?

Dagster doesn’t care what format your asset takes — PNG, HTML, CSV, Markdown, whatever.

You could go wild and build a monthly_summary_report asset that:

Think of Dagster as your data production line — and visualization as the packaging step. Still critical. Still part of the flow.


⚠️ Common Anti-Patterns

These are all signs your plots should be assets:

If you treat them like code outputs, not manual snapshots, you avoid all of this.


🛠 Example: manhattan_map Asset

In our project, we created this:

@dg.asset(deps=["manhattan_stats"])
def manhattan_map() -> None:
    ...
    plt.savefig(constants.MANHATTAN_MAP_FILE_PATH, format="png")

This is just as important as the database or parquet file before it.

That’s the spirit of asset-based pipelines.


✅ TL;DR

In Dagster (and in modern data engineering in general):

If it’s derived from data, and you care about it — it’s an asset.

Plots included.

Whether it’s for an exec deck, internal dashboard, or just your own validation — make your visualizations part of your graph. Your future self (and your stakeholders) will thank you.


Hell yes — this is the kind of practice challenge that actually builds real understanding. Below is a rewritten version of the trips_by_week section in your blog post that mirrors the Dagster University musterlösung, but in your hacker-student voice, with explanations, excursus blocks, and clean breakdowns for each step. This turns the previous “quick asset” into a full, memory-aware, production-ish slice of pipeline.


🧪 Practice Asset: trips_by_week – The Real Deal

Let’s redo trips_by_week, but properly. The new goal:

Here’s the new asset:

from datetime import datetime, timedelta
from . import constants

import pandas as pd
import dagster as dg
from dagster._utils.backoff import backoff
import duckdb
import os


@dg.asset(deps=["taxi_trips"])
def trips_by_week() -> None:
    conn = backoff(
        fn=duckdb.connect,
        retry_on=(RuntimeError, duckdb.IOException),
        kwargs={"database": os.getenv("DUCKDB_DATABASE")},
        max_retries=10,
    )

    current_date = datetime.strptime("2023-03-01", constants.DATE_FORMAT)
    end_date = datetime.strptime("2023-04-01", constants.DATE_FORMAT)

    result = pd.DataFrame()

    while current_date < end_date:
        current_date_str = current_date.strftime(constants.DATE_FORMAT)
        query = f"""
            select
                vendor_id, total_amount, trip_distance, passenger_count
            from trips
            where date_trunc('week', pickup_datetime) = date_trunc('week', '{current_date_str}'::date)
        """

        data_for_week = conn.execute(query).fetch_df()

        aggregate = data_for_week.agg({
            "vendor_id": "count",
            "total_amount": "sum",
            "trip_distance": "sum",
            "passenger_count": "sum"
        }).rename({"vendor_id": "num_trips"}).to_frame().T  # type: ignore

        aggregate["period"] = current_date
        result = pd.concat([result, aggregate])

        current_date += timedelta(days=7)

    # format cleanup
    result['num_trips'] = result['num_trips'].astype(int)
    result['passenger_count'] = result['passenger_count'].astype(int)
    result['total_amount'] = result['total_amount'].round(2)
    result['trip_distance'] = result['trip_distance'].round(2)
    result = result[["period", "num_trips", "total_amount", "trip_distance", "passenger_count"]]
    result = result.sort_values(by="period")

    result.to_csv(constants.TRIPS_BY_WEEK_FILE_PATH, index=False)

🔍 What’s Actually Happening

Let’s break this down by intention:

Step What’s Going On
current_date loop Iterates over each week between March 1 and April 1
query Pulls only rows for that week (using date_trunc)
agg() Aggregates trip stats for the week
concat Builds up a DataFrame week by week
to_csv() Dumps the final result to a CSV file we can inspect or use later

🧠 You could do all this in SQL at once — but breaking it down gives us more control, and allows memory-safe iteration. Plus, it’s a good mental model for streaming/partitioned thinking.


🧠 Mini Excursus: Why Not Just GROUP BY?

You might ask:

“Why not just GROUP BY date_trunc('week', pickup_datetime) once and be done with it?”

Fair. But here’s the trick:

This pattern makes your code future-proof and scalable.


🧪 Example Output

Your resulting CSV will look like:

period,num_trips,total_amount,trip_distance,passenger_count
2023-03-05,679681,18495110.72,2358944.42,886486
2023-03-12,686461,19151177.45,2664123.87,905296
2023-03-19,640158,17908993.09,2330611.91,838066

Perfect for loading into dashboards, reports, or another visualization asset. 👀


🧠 Mini Excursus: Asset Fanout Patterns

This type of design — aggregate then emit something like .csv, .json, .html, or .pdf — is perfect for fanout assets.

Input Output Pattern Type
Full DB table Weekly summary CSV Aggregation
JSON blob HTML report Formatting
GeoDataFrame PNG heatmap Visualization
ML model Pickle file Serialization

You’re not limited to “data-to-data” pipelines — data-to-output is just as valid.


✅ Wrapping Up Part Two

You now know how to:

Next up in Part 3:

📁 Understanding code locations and asset groups

🔍 How Dagster manages definitions under the hood

📦 Structuring your project for clarity and modularity

🧰 Exploring resources and how to inject them cleanly

We’re starting to build real pipelines now — not just functions.


📚 Resources