đź’¨
Apache Airflow+Stompy

Battle-tested DAGs with battle-hardened memory

DAGs that remember every run

The Problem

Apache Airflow is the backbone of modern data infrastructure. Battle-tested at Airbnb, then released to power thousands of production environments worldwide. Your DAGs run reliably at scale. Retries, dependencies, SLAs, pools, queues—all the machinery of production orchestration.

But battle-tested doesn't mean battle-smart.

Your ETL DAG has run 5,000 times. It's seen every possible failure mode. Connection timeouts during peak hours. Rate limits from APIs on Mondays. That one S3 bucket that randomly returns 503s at 2 AM. Your DAG has handled all of these—often repeatedly—but it hasn't learned from any of them.

You've added retry logic based on experience. But it's hardcoded experience: "retry 3 times with exponential backoff because we've seen transient failures." The DAG can't distinguish between a failure that will resolve with retries and one that needs different handling. It can't know that this specific vendor's API always fails on the 1st of the month for maintenance.

Airflow's logs are comprehensive. You can grep through thousands of task instances to find patterns. But that's archaeology, not intelligence. Your DAG can't access that pattern knowledge at runtime. It can't say "this looks like the same issue we saw 47 times last quarter."

Production DAGs deserve production intelligence.

How Stompy Helps

Stompy brings institutional memory to Airflow's proven orchestration.

Before tasks execute, pull relevant context. What failure patterns have we seen with this operator? What does historical data suggest about this time of day, this day of week, this data source? Your DAG runs with the accumulated wisdom of all previous runs.

After tasks complete, capture insights. Not just success/failure—the shape of the data, the duration patterns, the anomalies encountered. This builds a knowledge base that future DAG runs can query.

Your production pipelines become learning systems: - **Pre-execution intelligence**: Query historical patterns before sensitive operators run - **Failure pattern recognition**: Distinguish between retryable and non-retryable issues based on history - **Operational wisdom**: Know that Tuesday mornings have higher API latency, adjust timeouts accordingly - **Cross-DAG learning**: Insights from one pipeline inform related pipelines

Airflow handles orchestration. Stompy handles institutional knowledge.

Integration Walkthrough

1

Create a reusable Stompy hook

Build a custom Airflow hook for Stompy integration across all your DAGs.

from airflow.hooks.base import BaseHook
import httpx
import os
class StompyHook(BaseHook):
"""Airflow hook for Stompy persistent memory."""
conn_name_attr = 'stompy_conn_id'
default_conn_name = 'stompy_default'
conn_type = 'http'
hook_name = 'Stompy Memory'
def __init__(self, stompy_conn_id: str = default_conn_name):
super().__init__()
self.stompy_conn_id = stompy_conn_id
self.base_url = "https://mcp.stompy.ai/sse"
self.token = os.environ.get('STOMPY_TOKEN')
async def search_context(self, query: str, limit: int = 5) -> list[dict]:
"""Search for relevant historical context."""
async with httpx.AsyncClient() as client:
response = await client.post(
self.base_url,
headers={"Authorization": f"Bearer {self.token}"},
json={
"tool": "context_search",
"params": {"query": query, "limit": limit}
}
)
return response.json().get("contexts", [])
async def save_context(self, topic: str, content: str, tags: list[str] = None):
"""Save insights for future DAG runs."""
async with httpx.AsyncClient() as client:
await client.post(
self.base_url,
headers={"Authorization": f"Bearer {self.token}"},
json={
"tool": "lock_context",
"params": {
"topic": topic,
"content": content,
"tags": ",".join(tags or ["airflow"])
}
}
)
2

Build memory-aware DAGs

Integrate historical context into your DAG execution for smarter orchestration.

from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)}
)
def memory_aware_etl():
"""ETL DAG with historical pattern awareness."""
@task
async def check_historical_patterns(source: str) -> dict:
"""Query for known issues before processing."""
from stompy_hook import StompyHook
hook = StompyHook()
patterns = await hook.search_context(
query=f"source:{source} failures patterns issues",
limit=10
)
return {
"known_issues": len(patterns),
"recent_failures": [p["content"] for p in patterns if "failure" in p.get("tags", "")]
}
@task
async def extract_with_context(source: str, patterns: dict):
"""Extract data with awareness of historical issues."""
if patterns["known_issues"] > 5:
# High issue count - use more conservative timeouts
timeout = timedelta(minutes=10)
else:
timeout = timedelta(minutes=5)
# Extract with pattern-informed configuration
data = await extract_from_source(source, timeout=timeout)
return data
@task
async def save_run_outcome(source: str, success: bool, details: str):
"""Record run outcome for future pattern detection."""
from stompy_hook import StompyHook
hook = StompyHook()
await hook.save_context(
topic=f"etl_{source}_{datetime.now().isoformat()}",
content=f"Source: {source}\nSuccess: {success}\nDetails: {details}",
tags=["airflow", "etl", source, "success" if success else "failure"]
)
# DAG flow
patterns = check_historical_patterns("vendor_api")
data = extract_with_context("vendor_api", patterns)
save_run_outcome("vendor_api", True, "Processed successfully")
memory_aware_etl()
3

Build operational intelligence dashboards

Create DAGs that analyze accumulated operational knowledge.

from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule='@weekly', start_date=datetime(2024, 1, 1), catchup=False)
def operational_intelligence_report():
"""Weekly analysis of DAG operational patterns."""
@task
async def analyze_failure_patterns() -> dict:
"""Query accumulated failure patterns across all DAGs."""
from stompy_hook import StompyHook
hook = StompyHook()
# Get all failures from last week
failures = await hook.search_context(
query="airflow failure last 7 days",
limit=100
)
# Analyze patterns
by_source = {}
by_hour = {}
by_day = {}
for f in failures:
content = f.get("content", "")
# Extract and aggregate patterns...
return {
"total_failures": len(failures),
"by_source": by_source,
"by_hour": by_hour,
"by_day": by_day,
"recommendations": generate_recommendations(by_source, by_hour, by_day)
}
@task
async def generate_report(analysis: dict):
"""Generate and save intelligence report."""
from stompy_hook import StompyHook
hook = StompyHook()
report = f"""Weekly Operational Intelligence Report
Generated: {datetime.now().isoformat()}
Total Failures: {analysis['total_failures']}
Top Failure Sources: {analysis['by_source']}
Peak Failure Hours: {analysis['by_hour']}
Recommendations:
{analysis['recommendations']}"""
await hook.save_context(
topic=f"weekly_ops_report_{datetime.now().strftime('%Y%W')}",
content=report,
tags=["airflow", "weekly-report", "operational-intelligence"]
)
return report
analysis = analyze_failure_patterns()
generate_report(analysis)
operational_intelligence_report()

What You Get

  • DAGs that understand historical failure patterns before execution
  • Operational wisdom accumulated across thousands of runs
  • Semantic search for relevant historical context in any task
  • Integration that leverages Airflow's mature provider ecosystem
  • Production-grade memory for the production-grade orchestrator

Ready to give Apache Airflow a memory?

Join the waitlist and be the first to know when Stompy is ready. Your Apache Airflow projects will never forget again.