Data Engineering With Dagster Part Seven – Event-Driven Pipelines with Sensors
Table of Contents
🧠 From Passive Pipelines to Reactive Workflows
So far, we’ve scheduled jobs based on time: “Run this every Monday” or “materialize monthly.”
But what if we want to react to real-world events?
Like:
- A new file landing in a directory
- A table update in a database
- A Slack message or API webhook
That’s where sensors come in.
Sensors are how Dagster watches the world — and reacts with runs.
⚡ What Is a Sensor?
A sensor is a Python function that runs on a fixed interval (default: every 30s) and returns instructions like:
“Hey, start this job — here’s the config for it.”
Think of sensors as cron’s smarter cousin. Instead of firing at time X, they wait for something meaningful to happen.
🧠 Mini Excursus: Sensor vs. Schedule
Feature | Schedule | Sensor |
---|---|---|
Trigger type | Time-based | Event-driven |
Custom logic? | Some (but limited) | Full Python logic |
Run conditions | Fixed (cron) | Dynamic (file exists, etc.) |
Use case | Recurring jobs | Ad hoc or real-time runs |
🧑🍳 Our Use Case: Ad Hoc Report Requests
Imagine a stakeholder drops a .json
file into a data/requests/
folder that says:
“Hey, give me the number of taxi trips in Queens between Jan 1–15.”
We want:
- A sensor to pick up the new request
- A job to materialize a report
- An asset to read config and generate the output
🧾 Step 1: Define the Asset Config
We’ll start by defining how a request should be described.
Create assets/requests.py
and add:
from dagster import Config
class AdhocRequestConfig(Config):
filename: str
borough: str
start_date: str
end_date: str
This is the config schema our asset will expect. It’s clean, typed, and ready to be passed by sensors or manual runs.
📊 Step 2: Create the Ad Hoc Report Asset
This asset:
- Depends on the existing
taxi_trips
andtaxi_zones
assets - Uses DuckDB to query borough-filtered taxi data
- Creates a Matplotlib report as a stacked bar chart
Here’s the code (with explanation baked in):
from dagster import asset
from dagster_duckdb import DuckDBResource
from dagster_essentials.assets import constants
import matplotlib.pyplot as plt
@asset(deps=["taxi_zones", "taxi_trips"])
def adhoc_request(config: AdhocRequestConfig, database: DuckDBResource) -> None:
# Build the output file path
file_path = constants.REQUEST_DESTINATION_TEMPLATE_FILE_PATH.format(config.filename.split('.')[0])
# SQL to filter by borough and date range
query = f"""
SELECT
date_part('hour', pickup_datetime) AS hour_of_day,
date_part('dayofweek', pickup_datetime) AS day_num,
CASE date_part('dayofweek', pickup_datetime)
WHEN 0 THEN 'Sunday' WHEN 1 THEN 'Monday' WHEN 2 THEN 'Tuesday'
WHEN 3 THEN 'Wednesday' WHEN 4 THEN 'Thursday' WHEN 5 THEN 'Friday'
WHEN 6 THEN 'Saturday'
END AS day_of_week,
COUNT(*) AS num_trips
FROM trips
LEFT JOIN zones ON trips.pickup_zone_id = zones.zone_id
WHERE pickup_datetime >= '{config.start_date}'
AND pickup_datetime < '{config.end_date}'
AND pickup_zone_id IN (
SELECT zone_id FROM zones WHERE borough = '{config.borough}'
)
GROUP BY 1, 2
ORDER BY 1, 2
"""
# Run the query
with database.get_connection() as conn:
results = conn.execute(query).fetch_df()
# Plot it
fig, ax = plt.subplots(figsize=(10, 6))
pivot = results.pivot(index="hour_of_day", columns="day_of_week", values="num_trips")
pivot.plot(kind="bar", stacked=True, ax=ax, colormap="viridis")
ax.set_title(f"Taxi Trips in {config.borough}, {config.start_date}–{config.end_date}")
ax.set_xlabel("Hour of Day")
ax.set_ylabel("Trips")
ax.legend(title="Day of Week")
plt.tight_layout()
plt.savefig(file_path)
plt.close(fig)
🛠 Step 3: Create a Job for the Asset
In jobs.py
:
adhoc_request = dg.AssetSelection.assets(["adhoc_request"])
adhoc_request_job = dg.define_asset_job(
name="adhoc_request_job",
selection=adhoc_request,
)
Exclude It from Other Jobs
Update trip_update_job
to not include this:
trip_update_job = dg.define_asset_job(
name="trip_update_job",
partitions_def=monthly_partition,
selection=dg.AssetSelection.all() - trips_by_week - adhoc_request
)
This keeps ad hoc reports separate from scheduled bulk updates.
👁 Step 4: Sensors Need Memory → Enter Cursors
Sensors don’t just observe — they remember what they saw before.
That’s what a cursor is for: it tracks state between ticks. In our case:
- The cursor holds a map of filenames and their last modified times
- If a file is new or changed, we run the job
- After that, we update the cursor
🔁 Step 5: Build the Sensor
In sensors.py
:
import dagster as dg
import os, json
from dagster_essentials.jobs import adhoc_request_job
@dg.sensor(job=adhoc_request_job)
def adhoc_request_sensor(context: dg.SensorEvaluationContext):
PATH = os.path.join(os.path.dirname(__file__), "../../", "data/requests")
previous = json.loads(context.cursor) if context.cursor else {}
current = {}
runs = []
for fname in os.listdir(PATH):
full_path = os.path.join(PATH, fname)
if fname.endswith(".json") and os.path.isfile(full_path):
mtime = os.path.getmtime(full_path)
current[fname] = mtime
# Check if new or changed
if fname not in previous or previous[fname] != mtime:
with open(full_path, "r") as f:
config = json.load(f)
runs.append(dg.RunRequest(
run_key=f"adhoc_{fname}_{mtime}",
run_config={
"ops": {
"adhoc_request": {
"config": {
"filename": fname,
**config
}
}
}
}
))
return dg.SensorResult(run_requests=runs, cursor=json.dumps(current))
🧠 Recap Concepts
✅ Why Sensors Rock
- They’re event-driven
No wasted runs. They respond to actual changes. - They’re Python
Full logic. Not just “if modified,” but “if file is new, valid, and from Brooklyn…” - They’re stateful
Thanks to cursors, you don’t rerun old stuff.
🧠 Concept Checkpoints
What are sensors good for?
- Watching a folder
- Listening to a table update
- Triggering model retrains after uploads
- Scheduling jobs from Slack messages or APIs
What’s a sensor cursor?
A dictionary or simple value that stores what the sensor saw last time — so it only reacts to new things.
What is a RunRequest
?
A packaged instruction to Dagster:
“Run this job, with this config, for this reason.”
👉 You now have a fully working sensor that reacts to ad hoc requests dropped as JSON files and materializes visual reports using DuckDB and Matplotlib.
🔗 Hooking the Sensor into Dagster
You’ve built the sensor, the asset, and the job. Nice. But for Dagster to actually see and use them, we need to plug them into the Definitions
object — the central configuration Dagster reads when you run your project.
Here’s how to complete the wiring.
🔧 Step 1: Update Imports in definitions.py
At the top, make sure your file includes:
from dagster_essentials.assets import trips, metrics, requests
from dagster_essentials.jobs import trip_update_job, weekly_update_job, adhoc_request_job
from dagster_essentials.schedules import trip_update_schedule, weekly_update_schedule
from dagster_essentials.sensors import adhoc_request_sensor
This brings in everything Dagster needs to register — including your new reactive logic.
🧱 Step 2: Register the New Assets
Add this to your asset loading section:
request_assets = dg.load_assets_from_modules([requests])
Then include them in the Definitions call:
assets=[*trip_assets, *metric_assets, *request_assets],
⚙️ Step 3: Register Jobs and Sensors
Extend your jobs list:
all_jobs = [trip_update_job, weekly_update_job, adhoc_request_job]
And register your sensor:
all_sensors = [adhoc_request_sensor]
Finally, include the sensors in your Definitions block:
sensors=all_sensors,
✅ Your Final definitions.py
Should Now Look Like:
import dagster as dg
from dagster_essentials.assets import trips, metrics, requests
from dagster_essentials.resources import database_resource
from dagster_essentials.jobs import trip_update_job, weekly_update_job, adhoc_request_job
from dagster_essentials.schedules import trip_update_schedule, weekly_update_schedule
from dagster_essentials.sensors import adhoc_request_sensor
trip_assets = dg.load_assets_from_modules([trips])
metric_assets = dg.load_assets_from_modules([metrics])
request_assets = dg.load_assets_from_modules([requests])
all_jobs = [trip_update_job, weekly_update_job, adhoc_request_job]
all_schedules = [trip_update_schedule, weekly_update_schedule]
all_sensors = [adhoc_request_sensor]
defs = dg.Definitions(
assets=[*trip_assets, *metric_assets, *request_assets],
resources={
"database": database_resource,
},
jobs=all_jobs,
schedules=all_schedules,
sensors=all_sensors,
)
🖥 Sensors in the Dagster UI
You’ve registered everything. Let’s check out what Dagster shows us.
🗺 Step 1 – Visualize the Asset
Head to Assets > Global Asset Lineage.
You should see the new adhoc_request
asset in the graph. If not, hit Reload definitions — this refreshes your project’s metadata.
🔔 Step 2 – Find the Sensor
Navigate to Overview > Sensors.
Here you’ll see adhoc_request_sensor
listed.
By default, it’s off — Dagster doesn’t auto-run new sensors until you say so.
###🔍 Step 3 – Inspect the Sensor
Click the sensor name to open its detail page.
This page gives you everything:
- The job(s) it will trigger
- Past ticks (evaluations) and results
- Any runs the sensor initiated
This is your mission control for event-driven logic.
✅ Enabling the Sensor
Dagster sensors are paused when created. Let’s activate it properly.
📂 Step 1 – Create a Request File
Navigate to data/requests/
. Use the README as a reference, and create a new file:
📄 january-staten-island.json
{
"start_date": "2023-01-10",
"end_date": "2023-01-25",
"borough": "Staten Island"
}
This is the exact format your sensor and asset are expecting.
🧪 Step 2 – Test the Sensor
Instead of turning it on right away, use the Test Sensor button at the top right of the sensor detail page.
This runs a single tick — like poking the sensor to see what it would do.
Click Evaluate, and give it a second.
🚀 Step 3 – Launch the Run
Once the test completes, you’ll see a result:
Click Open in Launchpad — this drops you into the run screen for the job, pre-filled with the config from your .json
file.
Just click Launch Run.
Boom — ad hoc report generated.
📁 Step 4 – View the Output
After the run finishes, go to data/outputs/
.
You’ll see a .png
file created — the visual report from your ad hoc config.
It charts hourly taxi trip volume across weekdays for the selected borough and date range.
Absolutely — here’s the new and reworked closing section of Dagster Lesson 8 (Part 2), focused on reinforcing the concepts clearly, without quiz formatting. It ends with a clean, purposeful wrap-up.
🧠 Core Concepts You Should Own Now
Config and sensors are more than just add-ons — they’re core to making your pipelines intelligent and context-aware.
Let’s distill what we really unlocked here:
🔄 Config is Runtime-Driven Logic
Dagster’s Config
lets you define reusable, structured parameters that assets can use when they’re run. This means:
- No hardcoding logic into asset functions
- Easy reuse for different stakeholders or time windows
- You can test the same asset with different inputs without touching the code
In our case, we passed values like borough and date range via config to generate dynamic visual reports — this is powerful pattern-matching for all sorts of analytics tasks.
🧠 Sensors Are Always On — But Always Smart
Sensors don’t blindly run things. They:
- Observe specific events (like a file drop or DB update)
- Compare to a cursor (what they’ve already seen)
- Trigger new work only when it matters
This level of nuance allows your pipelines to respond to the world in real time, rather than just ticking along like a fixed clock.
🧠 Cursors Make Sensors Stateful
The reason your sensor doesn’t spam runs is because it remembers. That’s what the cursor is:
- A snapshot of the last known state
- Stored automatically by Dagster
- Used to figure out “what’s new” the next time it ticks
Without a cursor, you’d have to write way more manual logic (or worse — duplicate runs).
🧰 UI Tools That Accelerate Feedback
We also explored some killer Dagster UI features:
- Test Sensor lets you simulate ticks before enabling anything
- Launchpad pre-fills config for a one-click test run
- Asset Graph and Sensor Tabs give real-time status and traceability
These aren’t just UI candy — they’re a practical way to build confidence fast and debug smarter.
✅ Wrap Up
You now have a fully functional, event-triggered pipeline that reacts to requests, reads structured config, and materializes custom reports — all using Dagster’s sensor + config + asset system.
More importantly, you’ve seen how Dagster shifts from scheduled orchestration to responsive automation.
That means you’re not just building pipelines — you’re building event-aware systems that scale with your data and users.