Modern data pipelines with modern memory
Data flows, knowledge stays
The Problem
Prefect is what modern data orchestration should be. Clean Python, no DAGs required, cloud-native from the ground up. Your flows are observable, your deployments are flexible, your data pipelines finally feel like real software.
But each flow run exists in isolation.
You've built the perfect data pipeline. Extract from S3, transform with pandas, load to Snowflake. It runs beautifully. When the same data quality issue you fixed last month appears again, your flow has no idea. When a data source starts returning slightly different schemas—something you've handled before—you're back to debugging from scratch.
Prefect's observability is exceptional. You can see every task, every log, every metric. But visibility into the past isn't memory of the past. Your ETL flow can't query "what anomalies have we seen in this data source?" before it starts processing. It can't know that this particular vendor's files always have encoding issues on the first of the month.
The data world is full of patterns that repeat across runs. Seasonal spikes. Vendor quirks. Schema drift. Upstream delays. Your pipelines encounter these patterns, handle them (sometimes painfully), then forget. The institutional knowledge lives in your team's heads, not in your automation.
Data pipelines need data memory.
How Stompy Helps
Stompy transforms Prefect flows from stateless pipelines into learning systems.
Before processing begins, query for relevant context. What patterns have emerged from this data source? What anomalies required intervention? What optimizations were discovered? Your flow starts with historical intelligence.
As your flow runs, capture insights. Not just logs—searchable, semantic knowledge about what the data looked like, what transformations worked, what unexpected situations arose. This becomes institutional memory that future flows can leverage.
Your data pipelines gain continuous learning: - **Pre-processing intelligence**: Recall relevant patterns before tasks execute - **Anomaly pattern recognition**: Detect issues faster because you've seen them before - **Data drift tracking**: Monitor how your data sources evolve over time - **Optimization discovery**: Find performance improvements and remember what worked
Prefect makes orchestration easy. Stompy makes it smart.
Integration Walkthrough
Create memory-aware tasks
Build reusable tasks that can read from and write to Stompy's persistent memory.
from prefect import task, flowimport httpximport osSTOMPY_URL = "https://mcp.stompy.ai/sse"STOMPY_TOKEN = os.environ['STOMPY_TOKEN']@task(name="recall-data-source-context")async def recall_source_context(source_name: str) -> dict:"""Retrieve historical patterns for a data source before processing."""async with httpx.AsyncClient() as client:response = await client.post(STOMPY_URL,headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},json={"tool": "context_search","params": {"query": f"data source:{source_name} patterns anomalies","limit": 5}})return {"known_patterns": response.json().get("contexts", []),"source": source_name}@task(name="save-run-insights")async def save_run_insights(source_name: str, insights: str, tags: list[str]):"""Save discoveries from this run for future flows."""async with httpx.AsyncClient() as client:await client.post(STOMPY_URL,headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},json={"tool": "lock_context","params": {"topic": f"source_{source_name}_{datetime.now().isoformat()}","content": insights,"tags": ",".join(["prefect", source_name] + tags)}})
Build learning data flows
Integrate memory into your ETL pipeline for context-aware processing.
from prefect import flow, get_run_loggerfrom datetime import datetime@flow(name="learning-etl-pipeline")async def etl_pipeline(source_name: str, destination: str):logger = get_run_logger()# Step 1: Get historical context before processingcontext = await recall_source_context(source_name)known_anomalies = [p["content"] for p in context["known_patterns"]]if known_anomalies:logger.info(f"Known patterns for {source_name}: {len(known_anomalies)} items")# Step 2: Extract with pattern awarenessraw_data = await extract_data(source_name)# Step 3: Transform with historical contexttransformed = await transform_data(raw_data, known_patterns=known_anomalies)# Step 4: Track any new discoveriesif transformed.new_patterns:await save_run_insights(source_name=source_name,insights=f"New patterns discovered: {transformed.new_patterns}",tags=["pattern", "discovery"])# Step 5: Load to destinationresult = await load_data(transformed.data, destination)# Step 6: Save run summaryawait save_run_insights(source_name=source_name,insights=f"Run complete. Rows: {result.row_count}, Duration: {result.duration_ms}ms",tags=["run-summary"])return result
Build data quality dashboards
Query accumulated insights to understand data source behavior over time.
from prefect import flowfrom datetime import datetime, timedelta@flow(name="weekly-data-quality-report")async def data_quality_report(sources: list[str]):"""Generate insights from accumulated pipeline memory."""report = {"generated_at": datetime.now().isoformat(), "sources": {}}for source in sources:# Query all insights for this sourceasync with httpx.AsyncClient() as client:response = await client.post(STOMPY_URL,headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},json={"tool": "context_search","params": {"query": f"source:{source} last 7 days patterns anomalies","limit": 50}})contexts = response.json().get("contexts", [])report["sources"][source] = {"total_runs": len([c for c in contexts if "run-summary" in c.get("tags", "")]),"anomalies_detected": len([c for c in contexts if "anomaly" in c.get("tags", "")]),"patterns_discovered": len([c for c in contexts if "pattern" in c.get("tags", "")]),"recent_insights": [c["content"][:100] for c in contexts[:5]]}# Save the report itself as contextawait save_run_insights(source_name="quality_report",insights=json.dumps(report, indent=2),tags=["weekly-report", "data-quality"])return report
What You Get
- Flows that understand data source history before processing begins
- Anomaly detection that gets smarter with every run
- Data quality insights that accumulate across pipeline executions
- Pythonic integration that feels native to Prefect's task model
- Version history that tracks how your data evolves over time
Ready to give Prefect a memory?
Join the waitlist and be the first to know when Stompy is ready. Your Prefect projects will never forget again.