Data assets with persistent metadata
Assets materialize, knowledge persists
The Problem
Dagster reimagined data engineering. Instead of thinking in tasks and DAGs, you think in assets—the actual data products your organization needs. Materialization, lineage, freshness, partitions—all first-class concepts. Your data pipelines are finally designed around what matters: the data itself.
But asset metadata isn't asset memory.
Dagster knows everything about your asset graph. When each asset was last materialized. What upstream assets it depends on. Whether it's fresh. But it doesn't know what actually happened during those materializations. It can't tell you that the "daily_revenue" asset always has quality issues on the first business day of each month. It doesn't remember that "customer_segments" required manual intervention last quarter due to a schema change.
You've built a beautiful asset graph. Revenue flows through transformations, joins with customer data, materializes into dashboards. Dagster orchestrates perfectly. But when a data quality issue appears, you're searching Slack and memory for "didn't we see this before?" Your assets have lineage—they don't have history.
The patterns in your data assets repeat. Seasonal variations. Upstream quirks. Schema evolution. Quality regressions. Each materialization encounters these patterns, handles them, then the knowledge evaporates. Asset metadata tracks the "what" and "when." Nothing tracks the "what we learned."
Asset-based orchestration needs asset-based memory.
How Stompy Helps
Stompy adds a memory layer to Dagster's asset-centric worldview.
Every asset gains historical context. Before materialization begins, query for relevant insights about this specific asset. What patterns have emerged? What quality issues recur? What optimizations were discovered? The asset materializes with institutional knowledge.
After materialization, capture what happened—not just logs, but semantic insights about the data. Row counts, data shapes, quality metrics, anomalies, discoveries. This builds a searchable corpus of asset history that Dagster's metadata system wasn't designed to provide.
Your assets become truly intelligent: - **Pre-materialization context**: Know an asset's history before rebuilding it - **Quality pattern tracking**: Remember what quality issues affect which assets - **Cross-asset learning**: Insights from one asset inform related assets - **Materialization archaeology**: Understand how your assets have evolved over time
Dagster manages the asset graph. Stompy remembers what the assets have learned.
Integration Walkthrough
Create memory-aware asset helpers
Build utilities that connect your assets to Stompy's persistent memory.
from dagster import asset, AssetExecutionContext, MetadataValueimport httpximport osfrom typing import AnySTOMPY_URL = "https://mcp.stompy.ai/sse"STOMPY_TOKEN = os.environ['STOMPY_TOKEN']async def get_asset_history(asset_key: str, context: AssetExecutionContext) -> dict:"""Retrieve historical context for an asset before materialization."""async with httpx.AsyncClient() as client:response = await client.post(STOMPY_URL,headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},json={"tool": "context_search","params": {"query": f"asset:{asset_key} materialization patterns quality","limit": 10}})history = response.json().get("contexts", [])context.log.info(f"Retrieved {len(history)} historical insights for {asset_key}")return {"known_patterns": history,"quality_issues": [h for h in history if "quality" in h.get("tags", "")],"optimizations": [h for h in history if "optimization" in h.get("tags", "")]}async def save_asset_insights(asset_key: str,context: AssetExecutionContext,insights: dict[str, Any],tags: list[str]):"""Save materialization insights for future runs."""content = f"""Asset: {asset_key}Run ID: {context.run_id}Partition: {context.partition_key if context.has_partition_key else 'N/A'}Insights: {insights}"""async with httpx.AsyncClient() as client:await client.post(STOMPY_URL,headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},json={"tool": "lock_context","params": {"topic": f"asset_{asset_key}_{context.run_id}","content": content,"tags": ",".join(["dagster", asset_key] + tags)}})
Build learning asset definitions
Integrate memory into your asset materialization for context-aware processing.
from dagster import asset, AssetExecutionContext, Output, MetadataValueimport pandas as pd@asset(description="Daily revenue with historical pattern awareness",metadata={"memory_enabled": True})async def daily_revenue(context: AssetExecutionContext, raw_transactions: pd.DataFrame) -> Output[pd.DataFrame]:# Step 1: Get asset history before processinghistory = await get_asset_history("daily_revenue", context)# Step 2: Check for known quality issuesif history["quality_issues"]:context.log.info(f"Known quality issues: {len(history['quality_issues'])}")# Apply extra validation based on historical issues# Step 3: Process with awareness of historical patternsrevenue = raw_transactions.groupby('date').agg({'amount': 'sum','transaction_id': 'count'}).rename(columns={'amount': 'revenue', 'transaction_id': 'transaction_count'})# Step 4: Detect anomalies based on historical contextinsights = {"row_count": len(revenue),"total_revenue": revenue['revenue'].sum(),"date_range": f"{revenue.index.min()} to {revenue.index.max()}"}# Step 5: Save insights for future materializationsawait save_asset_insights(asset_key="daily_revenue",context=context,insights=insights,tags=["materialization", "revenue"])return Output(revenue,metadata={"row_count": MetadataValue.int(len(revenue)),"memory_insights_saved": MetadataValue.bool(True)})
Track asset quality over time
Build sensors that monitor asset memory for patterns and regressions.
from dagster import sensor, RunRequest, SensorEvaluationContextimport httpx@sensor(job=asset_quality_alert_job)async def asset_quality_sensor(context: SensorEvaluationContext):"""Monitor asset memory for quality regressions."""critical_assets = ["daily_revenue", "customer_segments", "product_catalog"]for asset_key in critical_assets:async with httpx.AsyncClient() as client:response = await client.post(STOMPY_URL,headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},json={"tool": "context_search","params": {"query": f"asset:{asset_key} quality issues last 7 days","tags": "quality,issue","limit": 10}})issues = response.json().get("contexts", [])if len(issues) >= 3:# Pattern detected: multiple quality issuesyield RunRequest(run_key=f"quality_alert_{asset_key}_{context.cursor}",run_config={"ops": {"send_alert": {"config": {"asset": asset_key,"issue_count": len(issues),"recent_issues": [i["content"][:100] for i in issues[:3]]}}}})
What You Get
- Assets that understand their materialization history before rebuilding
- Quality pattern detection across the entire asset graph
- Semantic search for asset-specific insights and learnings
- Memory that complements Dagster's asset metadata with actual operational knowledge
- Lineage-aware context that tracks how assets evolved over time
Ready to give Dagster a memory?
Join the waitlist and be the first to know when Stompy is ready. Your Dagster projects will never forget again.