Niklas Heringer - Cybersecurity Blog

Data Engineering With Dagster Part Six – Partitioning & Backfills

Posted on 8 mins

Dagster Partitions Backfill Pipeline-Design Duckdb

🍪 Partitioning: Baking Data One Batch at a Time

Let’s return to our running bakery analogy. Imagine you’re getting cookie orders:

Now, do you mix one massive bowl of dough every week, or prep daily batches based on demand?

Partitioning in Dagster is just that: breaking work into slices based on time or other meaningful dimensions.

A partition is a unit of data (or computation) scoped to a range — like a day, week, or month — that you can reason about, run, retry, and debug independently.


🧠 Why Partition?

Let’s say your asset processes NYC taxi data.

Benefits:

Concept Real Benefit
Scalability Break work into chunks for distributed execution
Efficiency Skip what’s already done
Resilience Retry only failed pieces
Debuggability Zero in on the broken slice
Storage optimization Avoid redundant reads/writes

🧩 Defining a Partition in Dagster

In Dagster, you declare your partitions using a PartitionsDefinition.

You’ll typically store these in partitions.py. Here’s how we define a monthly partition:

# partitions.py
import dagster as dg
from dagster_essentials.assets import constants

monthly_partition = dg.MonthlyPartitionsDefinition(
    start_date=constants.START_DATE,
    end_date=constants.END_DATE
)

This sets up a list of partition keys — each one representing a month like "2023-01-01", "2023-02-01", and so on.

Think of it like telling Dagster: “My data comes in monthly crates. Here’s where to start, and here’s where to stop.”


🛠 Partitioning an Existing Asset

Let’s take the existing taxi_trips_file asset — which fetches and saves monthly NYC trip data — and make it partition-aware.

Original asset (simplified):

@dg.asset
def taxi_trips_file() -> None:
    month_to_fetch = '2023-03'
    ...

Hardcoded month = bad. Let’s fix it.

Step-by-step upgrade:

  1. Use the monthly partition definition
    Add it to the asset via the partitions_def argument.

  2. Introduce the context object
    It gives you the partition key for the current materialization run.

  3. Use the partition key to derive the right month
    context.partition_key gives a string like "2023-03-01", which we trim to "2023-03".

Here’s the result:

from dagster import asset, AssetExecutionContext
from dagster_essentials.partitions import monthly_partition

@asset(partitions_def=monthly_partition)
def taxi_trips_file(context: AssetExecutionContext) -> None:
    """
    Fetches a month of NYC taxi data as a parquet file.
    """
    # Get the partition’s date (e.g. "2023-03-01") and slice to "2023-03"
    month_to_fetch = context.partition_key[:-3]

    raw_trips = requests.get(
        f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month_to_fetch}.parquet"
    )

    with open(constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch), "wb") as f:
        f.write(raw_trips.content)

🧠 Mini Excursus: What Is context.partition_key?

Every partitioned run comes with a partition key — that is, what slice of data are we working on right now?

Examples:

Partition partition_key
March 2023 "2023-03-01"
June 2024 "2024-06-01"

You can use this key to:


🔄 And What’s a Backfill?

A backfill is when you go back in time and re-run your asset for a range of partition keys.

When you’d backfill:

It’s like running git rebase for your data: reprocess old history with current logic.


🧪 Practice Exercise

Task: Partition the taxi_trips asset to load a month’s data from the parquet file into a DuckDB table, while tracking which partition each row came from.

Your goal:


✅ Example Solution

from dagster import asset, AssetExecutionContext
from dagster_duckdb import DuckDBResource
from dagster_essentials.partitions import monthly_partition

@asset(
    deps=["taxi_trips_file"],
    partitions_def=monthly_partition
)
def taxi_trips(context: AssetExecutionContext, database: DuckDBResource) -> None:
    """
    Loads a month of taxi trips into DuckDB, adding the partition date.
    """
    month_to_fetch = context.partition_key[:-3]

    query = f"""
      create table if not exists trips (
        vendor_id integer, pickup_zone_id integer, dropoff_zone_id integer,
        rate_code_id double, payment_type integer, dropoff_datetime timestamp,
        pickup_datetime timestamp, trip_distance double, passenger_count double,
        total_amount double, partition_date varchar
      );

      delete from trips where partition_date = '{month_to_fetch}';

      insert into trips
      select
        VendorID, PULocationID, DOLocationID, RatecodeID, payment_type, tpep_dropoff_datetime,
        tpep_pickup_datetime, trip_distance, passenger_count, total_amount, '{month_to_fetch}' as partition_date
      from '{constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch)}';
    """

    with database.get_connection() as conn:
        conn.execute(query)

🧠 Key Concept Recap

Concept How it Works in Practice
partition_key Controls what data to fetch/insert
partitions_def Makes your asset partitionable
context Gives you metadata for scoped execution
Backfill Re-run a specific set of partitions to “catch up” or reprocess

📅 From Time-Based Partitions to Time-Based Schedules

So far, we’ve:

Now we’ll update our job and schedule to match this new logic.


🧠 Why You Should Partition Your Job (Not Just Assets)

Here’s the thing: you can partition your assets, but unless your job also understands and respects those partitions, you’ll just be triggering the whole graph again.

Instead, by giving your job a partition definition, Dagster knows to:

It’s like telling the chef to bake only today’s batch — not the entire backlog.


🛠 Updating the Job with monthly_partition

Here’s how trip_update_job looks before adding partitions:

trip_update_job = dg.define_asset_job(
    name="trip_update_job",
    selection=dg.AssetSelection.all() - dg.AssetSelection.assets(["trips_by_week"]),
)

Now let’s add monthly_partition to scope this job to one month per run:

from dagster_essentials.partitions import monthly_partition

trip_update_job = dg.define_asset_job(
    name="trip_update_job",
    partitions_def=monthly_partition,
    selection=dg.AssetSelection.all() - dg.AssetSelection.assets(["trips_by_week"]),
)

✅ Why This Works

Because the job has a partitions_def, every time the schedule triggers it, Dagster automatically injects a partition key (like "2023-04-01"). The assets know how to respond — thanks to the partition-aware logic we built in Part 6 (Part 1).


📆 Partitioned Schedules: Smarter Automation

You might remember we scheduled trip_update_job to run on the 5th of every month:

trip_update_schedule = dg.ScheduleDefinition(
    job=trip_update_job,
    cron_schedule="0 0 5 * *",
)

Now that the job is partitioned, Dagster does something special:
It maps each schedule tick to a specific partition key.

That means:

This is huge: you now get fine-grained control without manually specifying what slice to run.


🧪 The Dagster UI: Partitioned Assets in Action

With dagster dev running, reload your definitions and head to the UI.

🔍 Visualizing Assets with Partitions

  1. Go to Assets > Asset Lineage

  2. You’ll see partitioned assets like taxi_trips_file and taxi_trips marked with ⚪ or ⚫

  3. Hover over each node to see partition status:

    • 0 = no partitions materialized yet
    • ⚠️ = failed partitions (if any)
    • All = how many total partitions exist

assets UI


🚀 Materializing with Partitions

Click Materialize all on an asset with partitions, and a dialog appears:

partition selection


🔁 Running a Backfill

Backfilling = reprocessing history. Here’s how:

  1. Click Launch backfill from the materialization modal
  2. It’ll suggest a full range — tweak as needed
  3. Head to Overview > Backfills to track progress

Each backfill run shows:

backfill page


🔎 Drilling into a Partition

Click an asset → Asset Catalog
You’ll see each month listed as a separate line item.

Click "2023-03-01" for example, and Dagster shows:

partition detail


🧠 Recap: What You’ve Achieved

So far, you’ve:

You now have precise, replayable, scalable data logic.

And you can confidently answer:


📚 Knowledge Recap

✅ Which of the following are true?


✅ True or False?

Partitions can be used in jobs and assets
→ ✅ True

Together, they allow for fine-grained, deterministic pipeline execution.


🧠 Mini Excursus: Not Just Time

So far, we partitioned by month — but Dagster’s partitioning isn’t limited to timestamps.

You can partition by:

If you can list it, you can partition on it.


✅ Wrapping Up

Partitioning isn’t just a feature — it’s a way of thinking about scalable, maintainable data engineering.

With:

…you’ve unlocked real orchestration power.

Onward to sensors and dynamic triggers — but only when you’re ready. 💥