🌊
Prefect+Stompy

Modern data pipelines with modern memory

Data flows, knowledge stays

The Problem

Prefect is what modern data orchestration should be. Clean Python, no DAGs required, cloud-native from the ground up. Your flows are observable, your deployments are flexible, your data pipelines finally feel like real software.

But each flow run exists in isolation.

You've built the perfect data pipeline. Extract from S3, transform with pandas, load to Snowflake. It runs beautifully. When the same data quality issue you fixed last month appears again, your flow has no idea. When a data source starts returning slightly different schemas—something you've handled before—you're back to debugging from scratch.

Prefect's observability is exceptional. You can see every task, every log, every metric. But visibility into the past isn't memory of the past. Your ETL flow can't query "what anomalies have we seen in this data source?" before it starts processing. It can't know that this particular vendor's files always have encoding issues on the first of the month.

The data world is full of patterns that repeat across runs. Seasonal spikes. Vendor quirks. Schema drift. Upstream delays. Your pipelines encounter these patterns, handle them (sometimes painfully), then forget. The institutional knowledge lives in your team's heads, not in your automation.

Data pipelines need data memory.

How Stompy Helps

Stompy transforms Prefect flows from stateless pipelines into learning systems.

Before processing begins, query for relevant context. What patterns have emerged from this data source? What anomalies required intervention? What optimizations were discovered? Your flow starts with historical intelligence.

As your flow runs, capture insights. Not just logs—searchable, semantic knowledge about what the data looked like, what transformations worked, what unexpected situations arose. This becomes institutional memory that future flows can leverage.

Your data pipelines gain continuous learning: - **Pre-processing intelligence**: Recall relevant patterns before tasks execute - **Anomaly pattern recognition**: Detect issues faster because you've seen them before - **Data drift tracking**: Monitor how your data sources evolve over time - **Optimization discovery**: Find performance improvements and remember what worked

Prefect makes orchestration easy. Stompy makes it smart.

Integration Walkthrough

1

Create memory-aware tasks

Build reusable tasks that can read from and write to Stompy's persistent memory.

from prefect import task, flow
import httpx
import os
STOMPY_URL = "https://mcp.stompy.ai/sse"
STOMPY_TOKEN = os.environ['STOMPY_TOKEN']
@task(name="recall-data-source-context")
async def recall_source_context(source_name: str) -> dict:
"""Retrieve historical patterns for a data source before processing."""
async with httpx.AsyncClient() as client:
response = await client.post(
STOMPY_URL,
headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},
json={
"tool": "context_search",
"params": {
"query": f"data source:{source_name} patterns anomalies",
"limit": 5
}
}
)
return {
"known_patterns": response.json().get("contexts", []),
"source": source_name
}
@task(name="save-run-insights")
async def save_run_insights(source_name: str, insights: str, tags: list[str]):
"""Save discoveries from this run for future flows."""
async with httpx.AsyncClient() as client:
await client.post(
STOMPY_URL,
headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},
json={
"tool": "lock_context",
"params": {
"topic": f"source_{source_name}_{datetime.now().isoformat()}",
"content": insights,
"tags": ",".join(["prefect", source_name] + tags)
}
}
)
2

Build learning data flows

Integrate memory into your ETL pipeline for context-aware processing.

from prefect import flow, get_run_logger
from datetime import datetime
@flow(name="learning-etl-pipeline")
async def etl_pipeline(source_name: str, destination: str):
logger = get_run_logger()
# Step 1: Get historical context before processing
context = await recall_source_context(source_name)
known_anomalies = [p["content"] for p in context["known_patterns"]]
if known_anomalies:
logger.info(f"Known patterns for {source_name}: {len(known_anomalies)} items")
# Step 2: Extract with pattern awareness
raw_data = await extract_data(source_name)
# Step 3: Transform with historical context
transformed = await transform_data(raw_data, known_patterns=known_anomalies)
# Step 4: Track any new discoveries
if transformed.new_patterns:
await save_run_insights(
source_name=source_name,
insights=f"New patterns discovered: {transformed.new_patterns}",
tags=["pattern", "discovery"]
)
# Step 5: Load to destination
result = await load_data(transformed.data, destination)
# Step 6: Save run summary
await save_run_insights(
source_name=source_name,
insights=f"Run complete. Rows: {result.row_count}, Duration: {result.duration_ms}ms",
tags=["run-summary"]
)
return result
3

Build data quality dashboards

Query accumulated insights to understand data source behavior over time.

from prefect import flow
from datetime import datetime, timedelta
@flow(name="weekly-data-quality-report")
async def data_quality_report(sources: list[str]):
"""Generate insights from accumulated pipeline memory."""
report = {"generated_at": datetime.now().isoformat(), "sources": {}}
for source in sources:
# Query all insights for this source
async with httpx.AsyncClient() as client:
response = await client.post(
STOMPY_URL,
headers={"Authorization": f"Bearer {STOMPY_TOKEN}"},
json={
"tool": "context_search",
"params": {
"query": f"source:{source} last 7 days patterns anomalies",
"limit": 50
}
}
)
contexts = response.json().get("contexts", [])
report["sources"][source] = {
"total_runs": len([c for c in contexts if "run-summary" in c.get("tags", "")]),
"anomalies_detected": len([c for c in contexts if "anomaly" in c.get("tags", "")]),
"patterns_discovered": len([c for c in contexts if "pattern" in c.get("tags", "")]),
"recent_insights": [c["content"][:100] for c in contexts[:5]]
}
# Save the report itself as context
await save_run_insights(
source_name="quality_report",
insights=json.dumps(report, indent=2),
tags=["weekly-report", "data-quality"]
)
return report

What You Get

  • Flows that understand data source history before processing begins
  • Anomaly detection that gets smarter with every run
  • Data quality insights that accumulate across pipeline executions
  • Pythonic integration that feels native to Prefect's task model
  • Version history that tracks how your data evolves over time

Ready to give Prefect a memory?

Join the waitlist and be the first to know when Stompy is ready. Your Prefect projects will never forget again.