Niklas Heringer - Cybersecurity Blog

Data Engineering With Dagster Part Seven – Event-Driven Pipelines with Sensors

Posted on 9 mins

Dagster Sensors Configuration Automation Adhoc-Analysis

🧠 From Passive Pipelines to Reactive Workflows

So far, we’ve scheduled jobs based on time: “Run this every Monday” or “materialize monthly.”

But what if we want to react to real-world events?

Like:

That’s where sensors come in.

Sensors are how Dagster watches the world — and reacts with runs.


⚡ What Is a Sensor?

A sensor is a Python function that runs on a fixed interval (default: every 30s) and returns instructions like:

“Hey, start this job — here’s the config for it.”

Think of sensors as cron’s smarter cousin. Instead of firing at time X, they wait for something meaningful to happen.


🧠 Mini Excursus: Sensor vs. Schedule

Feature Schedule Sensor
Trigger type Time-based Event-driven
Custom logic? Some (but limited) Full Python logic
Run conditions Fixed (cron) Dynamic (file exists, etc.)
Use case Recurring jobs Ad hoc or real-time runs

🧑‍🍳 Our Use Case: Ad Hoc Report Requests

Imagine a stakeholder drops a .json file into a data/requests/ folder that says:

“Hey, give me the number of taxi trips in Queens between Jan 1–15.”

We want:


🧾 Step 1: Define the Asset Config

We’ll start by defining how a request should be described.

Create assets/requests.py and add:

from dagster import Config

class AdhocRequestConfig(Config):
    filename: str
    borough: str
    start_date: str
    end_date: str

This is the config schema our asset will expect. It’s clean, typed, and ready to be passed by sensors or manual runs.


📊 Step 2: Create the Ad Hoc Report Asset

This asset:

Here’s the code (with explanation baked in):

from dagster import asset
from dagster_duckdb import DuckDBResource
from dagster_essentials.assets import constants
import matplotlib.pyplot as plt

@asset(deps=["taxi_zones", "taxi_trips"])
def adhoc_request(config: AdhocRequestConfig, database: DuckDBResource) -> None:
    # Build the output file path
    file_path = constants.REQUEST_DESTINATION_TEMPLATE_FILE_PATH.format(config.filename.split('.')[0])

    # SQL to filter by borough and date range
    query = f"""
        SELECT
          date_part('hour', pickup_datetime) AS hour_of_day,
          date_part('dayofweek', pickup_datetime) AS day_num,
          CASE date_part('dayofweek', pickup_datetime)
            WHEN 0 THEN 'Sunday' WHEN 1 THEN 'Monday' WHEN 2 THEN 'Tuesday'
            WHEN 3 THEN 'Wednesday' WHEN 4 THEN 'Thursday' WHEN 5 THEN 'Friday'
            WHEN 6 THEN 'Saturday'
          END AS day_of_week,
          COUNT(*) AS num_trips
        FROM trips
        LEFT JOIN zones ON trips.pickup_zone_id = zones.zone_id
        WHERE pickup_datetime >= '{config.start_date}'
          AND pickup_datetime < '{config.end_date}'
          AND pickup_zone_id IN (
            SELECT zone_id FROM zones WHERE borough = '{config.borough}'
          )
        GROUP BY 1, 2
        ORDER BY 1, 2
    """

    # Run the query
    with database.get_connection() as conn:
        results = conn.execute(query).fetch_df()

    # Plot it
    fig, ax = plt.subplots(figsize=(10, 6))
    pivot = results.pivot(index="hour_of_day", columns="day_of_week", values="num_trips")
    pivot.plot(kind="bar", stacked=True, ax=ax, colormap="viridis")

    ax.set_title(f"Taxi Trips in {config.borough}, {config.start_date}{config.end_date}")
    ax.set_xlabel("Hour of Day")
    ax.set_ylabel("Trips")
    ax.legend(title="Day of Week")
    plt.tight_layout()

    plt.savefig(file_path)
    plt.close(fig)

🛠 Step 3: Create a Job for the Asset

In jobs.py:

adhoc_request = dg.AssetSelection.assets(["adhoc_request"])

adhoc_request_job = dg.define_asset_job(
    name="adhoc_request_job",
    selection=adhoc_request,
)

Exclude It from Other Jobs

Update trip_update_job to not include this:

trip_update_job = dg.define_asset_job(
    name="trip_update_job",
    partitions_def=monthly_partition,
    selection=dg.AssetSelection.all() - trips_by_week - adhoc_request
)

This keeps ad hoc reports separate from scheduled bulk updates.


👁 Step 4: Sensors Need Memory → Enter Cursors

Sensors don’t just observe — they remember what they saw before.

That’s what a cursor is for: it tracks state between ticks. In our case:


🔁 Step 5: Build the Sensor

In sensors.py:

import dagster as dg
import os, json
from dagster_essentials.jobs import adhoc_request_job

@dg.sensor(job=adhoc_request_job)
def adhoc_request_sensor(context: dg.SensorEvaluationContext):
    PATH = os.path.join(os.path.dirname(__file__), "../../", "data/requests")

    previous = json.loads(context.cursor) if context.cursor else {}
    current = {}
    runs = []

    for fname in os.listdir(PATH):
        full_path = os.path.join(PATH, fname)
        if fname.endswith(".json") and os.path.isfile(full_path):
            mtime = os.path.getmtime(full_path)
            current[fname] = mtime

            # Check if new or changed
            if fname not in previous or previous[fname] != mtime:
                with open(full_path, "r") as f:
                    config = json.load(f)

                runs.append(dg.RunRequest(
                    run_key=f"adhoc_{fname}_{mtime}",
                    run_config={
                        "ops": {
                            "adhoc_request": {
                                "config": {
                                    "filename": fname,
                                    **config
                                }
                            }
                        }
                    }
                ))

    return dg.SensorResult(run_requests=runs, cursor=json.dumps(current))

🧠 Recap Concepts

✅ Why Sensors Rock


🧠 Concept Checkpoints

What are sensors good for?

What’s a sensor cursor?

A dictionary or simple value that stores what the sensor saw last time — so it only reacts to new things.

What is a RunRequest?

A packaged instruction to Dagster:
“Run this job, with this config, for this reason.”


👉 You now have a fully working sensor that reacts to ad hoc requests dropped as JSON files and materializes visual reports using DuckDB and Matplotlib.


🔗 Hooking the Sensor into Dagster

You’ve built the sensor, the asset, and the job. Nice. But for Dagster to actually see and use them, we need to plug them into the Definitions object — the central configuration Dagster reads when you run your project.

Here’s how to complete the wiring.


🔧 Step 1: Update Imports in definitions.py

At the top, make sure your file includes:

from dagster_essentials.assets import trips, metrics, requests
from dagster_essentials.jobs import trip_update_job, weekly_update_job, adhoc_request_job
from dagster_essentials.schedules import trip_update_schedule, weekly_update_schedule
from dagster_essentials.sensors import adhoc_request_sensor

This brings in everything Dagster needs to register — including your new reactive logic.


🧱 Step 2: Register the New Assets

Add this to your asset loading section:

request_assets = dg.load_assets_from_modules([requests])

Then include them in the Definitions call:

assets=[*trip_assets, *metric_assets, *request_assets],

⚙️ Step 3: Register Jobs and Sensors

Extend your jobs list:

all_jobs = [trip_update_job, weekly_update_job, adhoc_request_job]

And register your sensor:

all_sensors = [adhoc_request_sensor]

Finally, include the sensors in your Definitions block:

sensors=all_sensors,

✅ Your Final definitions.py Should Now Look Like:

import dagster as dg

from dagster_essentials.assets import trips, metrics, requests
from dagster_essentials.resources import database_resource
from dagster_essentials.jobs import trip_update_job, weekly_update_job, adhoc_request_job
from dagster_essentials.schedules import trip_update_schedule, weekly_update_schedule
from dagster_essentials.sensors import adhoc_request_sensor

trip_assets = dg.load_assets_from_modules([trips])
metric_assets = dg.load_assets_from_modules([metrics])
request_assets = dg.load_assets_from_modules([requests])

all_jobs = [trip_update_job, weekly_update_job, adhoc_request_job]
all_schedules = [trip_update_schedule, weekly_update_schedule]
all_sensors = [adhoc_request_sensor]

defs = dg.Definitions(
    assets=[*trip_assets, *metric_assets, *request_assets],
    resources={
        "database": database_resource,
    },
    jobs=all_jobs,
    schedules=all_schedules,
    sensors=all_sensors,
)

🖥 Sensors in the Dagster UI

You’ve registered everything. Let’s check out what Dagster shows us.


🗺 Step 1 – Visualize the Asset

Head to Assets > Global Asset Lineage.

You should see the new adhoc_request asset in the graph. If not, hit Reload definitions — this refreshes your project’s metadata.


🔔 Step 2 – Find the Sensor

Navigate to Overview > Sensors.

Here you’ll see adhoc_request_sensor listed.
By default, it’s off — Dagster doesn’t auto-run new sensors until you say so.


###🔍 Step 3 – Inspect the Sensor

Click the sensor name to open its detail page.

This page gives you everything:

This is your mission control for event-driven logic.


✅ Enabling the Sensor

Dagster sensors are paused when created. Let’s activate it properly.


📂 Step 1 – Create a Request File

Navigate to data/requests/. Use the README as a reference, and create a new file:

📄 january-staten-island.json

{
  "start_date": "2023-01-10",
  "end_date": "2023-01-25",
  "borough": "Staten Island"
}

This is the exact format your sensor and asset are expecting.


🧪 Step 2 – Test the Sensor

Instead of turning it on right away, use the Test Sensor button at the top right of the sensor detail page.

This runs a single tick — like poking the sensor to see what it would do.

Click Evaluate, and give it a second.


🚀 Step 3 – Launch the Run

Once the test completes, you’ll see a result:

Click Open in Launchpad — this drops you into the run screen for the job, pre-filled with the config from your .json file.

Just click Launch Run.

Boom — ad hoc report generated.


📁 Step 4 – View the Output

After the run finishes, go to data/outputs/.

You’ll see a .png file created — the visual report from your ad hoc config.

It charts hourly taxi trip volume across weekdays for the selected borough and date range.


Absolutely — here’s the new and reworked closing section of Dagster Lesson 8 (Part 2), focused on reinforcing the concepts clearly, without quiz formatting. It ends with a clean, purposeful wrap-up.


🧠 Core Concepts You Should Own Now

Config and sensors are more than just add-ons — they’re core to making your pipelines intelligent and context-aware.

Let’s distill what we really unlocked here:

🔄 Config is Runtime-Driven Logic

Dagster’s Config lets you define reusable, structured parameters that assets can use when they’re run. This means:

In our case, we passed values like borough and date range via config to generate dynamic visual reports — this is powerful pattern-matching for all sorts of analytics tasks.

🧠 Sensors Are Always On — But Always Smart

Sensors don’t blindly run things. They:

This level of nuance allows your pipelines to respond to the world in real time, rather than just ticking along like a fixed clock.

🧠 Cursors Make Sensors Stateful

The reason your sensor doesn’t spam runs is because it remembers. That’s what the cursor is:

Without a cursor, you’d have to write way more manual logic (or worse — duplicate runs).

🧰 UI Tools That Accelerate Feedback

We also explored some killer Dagster UI features:

These aren’t just UI candy — they’re a practical way to build confidence fast and debug smarter.


✅ Wrap Up

You now have a fully functional, event-triggered pipeline that reacts to requests, reads structured config, and materializes custom reports — all using Dagster’s sensor + config + asset system.

More importantly, you’ve seen how Dagster shifts from scheduled orchestration to responsive automation.

That means you’re not just building pipelines — you’re building event-aware systems that scale with your data and users.