📦
Dagster+Stompy

Data assets with persistent metadata

Assets materialize, knowledge persists

The Problem

Dagster reimagined data engineering. Instead of thinking in tasks and DAGs, you think in assets—the actual data products your organization needs. Materialization, lineage, freshness, partitions—all first-class concepts. Your data pipelines are finally designed around what matters: the data itself.

But asset metadata isn't asset memory.

Dagster knows everything about your asset graph. When each asset was last materialized. What upstream assets it depends on. Whether it's fresh. But it doesn't know what actually happened during those materializations. It can't tell you that the "daily_revenue" asset always has quality issues on the first business day of each month. It doesn't remember that "customer_segments" required manual intervention last quarter due to a schema change.

You've built a beautiful asset graph. Revenue flows through transformations, joins with customer data, materializes into dashboards. Dagster orchestrates perfectly. But when a data quality issue appears, you're searching Slack and memory for "didn't we see this before?" Your assets have lineage—they don't have history.

The patterns in your data assets repeat. Seasonal variations. Upstream quirks. Schema evolution. Quality regressions. Each materialization encounters these patterns, handles them, then the knowledge evaporates. Asset metadata tracks the "what" and "when." Nothing tracks the "what we learned."

Asset-based orchestration needs asset-based memory.

How Stompy Helps

Stompy adds a memory layer to Dagster's asset-centric worldview.

Every asset gains historical context. Before materialization begins, query for relevant insights about this specific asset. What patterns have emerged? What quality issues recur? What optimizations were discovered? The asset materializes with institutional knowledge.

After materialization, capture what happened—not just logs, but semantic insights about the data. Row counts, data shapes, quality metrics, anomalies, discoveries. This builds a searchable corpus of asset history that Dagster's metadata system wasn't designed to provide.

Your assets become truly intelligent: - **Pre-materialization context**: Know an asset's history before rebuilding it - **Quality pattern tracking**: Remember what quality issues affect which assets - **Cross-asset learning**: Insights from one asset inform related assets - **Materialization archaeology**: Understand how your assets have evolved over time

Dagster manages the asset graph. Stompy remembers what the assets have learned.

Integration Walkthrough

1

Create memory-aware asset helpers

Build utilities that connect your assets to Stompy's persistent memory.

from dagster import asset, AssetExecutionContext, MetadataValue
import httpx
import os
from typing import Any
STOMPY_URL = "https://mcp.stompy.ai/sse"
STOMPY_TOKEN = os.environ['STOMPY_TOKEN']
async def get_asset_history(asset_key: str, context: AssetExecutionContext) -> dict:
"""Retrieve historical context for an asset before materialization."""
async with httpx.AsyncClient() as client:
response = await client.post(
STOMPY_URL,
headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},
json={
"tool": "context_search",
"params": {
"query": f"asset:{asset_key} materialization patterns quality",
"limit": 10
}
}
)
history = response.json().get("contexts", [])
context.log.info(f"Retrieved {len(history)} historical insights for {asset_key}")
return {
"known_patterns": history,
"quality_issues": [h for h in history if "quality" in h.get("tags", "")],
"optimizations": [h for h in history if "optimization" in h.get("tags", "")]
}
async def save_asset_insights(
asset_key: str,
context: AssetExecutionContext,
insights: dict[str, Any],
tags: list[str]
):
"""Save materialization insights for future runs."""
content = f"""Asset: {asset_key}
Run ID: {context.run_id}
Partition: {context.partition_key if context.has_partition_key else 'N/A'}
Insights: {insights}"""
async with httpx.AsyncClient() as client:
await client.post(
STOMPY_URL,
headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},
json={
"tool": "lock_context",
"params": {
"topic": f"asset_{asset_key}_{context.run_id}",
"content": content,
"tags": ",".join(["dagster", asset_key] + tags)
}
}
)
2

Build learning asset definitions

Integrate memory into your asset materialization for context-aware processing.

from dagster import asset, AssetExecutionContext, Output, MetadataValue
import pandas as pd
@asset(
description="Daily revenue with historical pattern awareness",
metadata={"memory_enabled": True}
)
async def daily_revenue(context: AssetExecutionContext, raw_transactions: pd.DataFrame) -> Output[pd.DataFrame]:
# Step 1: Get asset history before processing
history = await get_asset_history("daily_revenue", context)
# Step 2: Check for known quality issues
if history["quality_issues"]:
context.log.info(f"Known quality issues: {len(history['quality_issues'])}")
# Apply extra validation based on historical issues
# Step 3: Process with awareness of historical patterns
revenue = raw_transactions.groupby('date').agg({
'amount': 'sum',
'transaction_id': 'count'
}).rename(columns={'amount': 'revenue', 'transaction_id': 'transaction_count'})
# Step 4: Detect anomalies based on historical context
insights = {
"row_count": len(revenue),
"total_revenue": revenue['revenue'].sum(),
"date_range": f"{revenue.index.min()} to {revenue.index.max()}"
}
# Step 5: Save insights for future materializations
await save_asset_insights(
asset_key="daily_revenue",
context=context,
insights=insights,
tags=["materialization", "revenue"]
)
return Output(
revenue,
metadata={
"row_count": MetadataValue.int(len(revenue)),
"memory_insights_saved": MetadataValue.bool(True)
}
)
3

Track asset quality over time

Build sensors that monitor asset memory for patterns and regressions.

from dagster import sensor, RunRequest, SensorEvaluationContext
import httpx
@sensor(job=asset_quality_alert_job)
async def asset_quality_sensor(context: SensorEvaluationContext):
"""Monitor asset memory for quality regressions."""
critical_assets = ["daily_revenue", "customer_segments", "product_catalog"]
for asset_key in critical_assets:
async with httpx.AsyncClient() as client:
response = await client.post(
STOMPY_URL,
headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},
json={
"tool": "context_search",
"params": {
"query": f"asset:{asset_key} quality issues last 7 days",
"tags": "quality,issue",
"limit": 10
}
}
)
issues = response.json().get("contexts", [])
if len(issues) >= 3:
# Pattern detected: multiple quality issues
yield RunRequest(
run_key=f"quality_alert_{asset_key}_{context.cursor}",
run_config={
"ops": {
"send_alert": {
"config": {
"asset": asset_key,
"issue_count": len(issues),
"recent_issues": [i["content"][:100] for i in issues[:3]]
}
}
}
}
)

What You Get

  • Assets that understand their materialization history before rebuilding
  • Quality pattern detection across the entire asset graph
  • Semantic search for asset-specific insights and learnings
  • Memory that complements Dagster's asset metadata with actual operational knowledge
  • Lineage-aware context that tracks how assets evolved over time

Ready to give Dagster a memory?

Join the waitlist and be the first to know when Stompy is ready. Your Dagster projects will never forget again.