Battle-tested DAGs with battle-hardened memory
DAGs that remember every run
The Problem
Apache Airflow is the backbone of modern data infrastructure. Battle-tested at Airbnb, then released to power thousands of production environments worldwide. Your DAGs run reliably at scale. Retries, dependencies, SLAs, pools, queues—all the machinery of production orchestration.
But battle-tested doesn't mean battle-smart.
Your ETL DAG has run 5,000 times. It's seen every possible failure mode. Connection timeouts during peak hours. Rate limits from APIs on Mondays. That one S3 bucket that randomly returns 503s at 2 AM. Your DAG has handled all of these—often repeatedly—but it hasn't learned from any of them.
You've added retry logic based on experience. But it's hardcoded experience: "retry 3 times with exponential backoff because we've seen transient failures." The DAG can't distinguish between a failure that will resolve with retries and one that needs different handling. It can't know that this specific vendor's API always fails on the 1st of the month for maintenance.
Airflow's logs are comprehensive. You can grep through thousands of task instances to find patterns. But that's archaeology, not intelligence. Your DAG can't access that pattern knowledge at runtime. It can't say "this looks like the same issue we saw 47 times last quarter."
Production DAGs deserve production intelligence.
How Stompy Helps
Stompy brings institutional memory to Airflow's proven orchestration.
Before tasks execute, pull relevant context. What failure patterns have we seen with this operator? What does historical data suggest about this time of day, this day of week, this data source? Your DAG runs with the accumulated wisdom of all previous runs.
After tasks complete, capture insights. Not just success/failure—the shape of the data, the duration patterns, the anomalies encountered. This builds a knowledge base that future DAG runs can query.
Your production pipelines become learning systems: - **Pre-execution intelligence**: Query historical patterns before sensitive operators run - **Failure pattern recognition**: Distinguish between retryable and non-retryable issues based on history - **Operational wisdom**: Know that Tuesday mornings have higher API latency, adjust timeouts accordingly - **Cross-DAG learning**: Insights from one pipeline inform related pipelines
Airflow handles orchestration. Stompy handles institutional knowledge.
Integration Walkthrough
Create a reusable Stompy hook
Build a custom Airflow hook for Stompy integration across all your DAGs.
from airflow.hooks.base import BaseHookimport httpximport osclass StompyHook(BaseHook):"""Airflow hook for Stompy persistent memory."""conn_name_attr = 'stompy_conn_id'default_conn_name = 'stompy_default'conn_type = 'http'hook_name = 'Stompy Memory'def __init__(self, stompy_conn_id: str = default_conn_name):super().__init__()self.stompy_conn_id = stompy_conn_idself.base_url = "https://mcp.stompy.ai/sse"self.token = os.environ.get('STOMPY_TOKEN')async def search_context(self, query: str, limit: int = 5) -> list[dict]:"""Search for relevant historical context."""async with httpx.AsyncClient() as client:response = await client.post(self.base_url,headers={"Authorization": f"Bearer {self.token}"},json={"tool": "context_search","params": {"query": query, "limit": limit}})return response.json().get("contexts", [])async def save_context(self, topic: str, content: str, tags: list[str] = None):"""Save insights for future DAG runs."""async with httpx.AsyncClient() as client:await client.post(self.base_url,headers={"Authorization": f"Bearer {self.token}"},json={"tool": "lock_context","params": {"topic": topic,"content": content,"tags": ",".join(tags or ["airflow"])}})
Build memory-aware DAGs
Integrate historical context into your DAG execution for smarter orchestration.
from airflow.decorators import dag, taskfrom datetime import datetime, timedelta@dag(schedule='@daily',start_date=datetime(2024, 1, 1),catchup=False,default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)})def memory_aware_etl():"""ETL DAG with historical pattern awareness."""@taskasync def check_historical_patterns(source: str) -> dict:"""Query for known issues before processing."""from stompy_hook import StompyHookhook = StompyHook()patterns = await hook.search_context(query=f"source:{source} failures patterns issues",limit=10)return {"known_issues": len(patterns),"recent_failures": [p["content"] for p in patterns if "failure" in p.get("tags", "")]}@taskasync def extract_with_context(source: str, patterns: dict):"""Extract data with awareness of historical issues."""if patterns["known_issues"] > 5:# High issue count - use more conservative timeoutstimeout = timedelta(minutes=10)else:timeout = timedelta(minutes=5)# Extract with pattern-informed configurationdata = await extract_from_source(source, timeout=timeout)return data@taskasync def save_run_outcome(source: str, success: bool, details: str):"""Record run outcome for future pattern detection."""from stompy_hook import StompyHookhook = StompyHook()await hook.save_context(topic=f"etl_{source}_{datetime.now().isoformat()}",content=f"Source: {source}\nSuccess: {success}\nDetails: {details}",tags=["airflow", "etl", source, "success" if success else "failure"])# DAG flowpatterns = check_historical_patterns("vendor_api")data = extract_with_context("vendor_api", patterns)save_run_outcome("vendor_api", True, "Processed successfully")memory_aware_etl()
Build operational intelligence dashboards
Create DAGs that analyze accumulated operational knowledge.
from airflow.decorators import dag, taskfrom datetime import datetime@dag(schedule='@weekly', start_date=datetime(2024, 1, 1), catchup=False)def operational_intelligence_report():"""Weekly analysis of DAG operational patterns."""@taskasync def analyze_failure_patterns() -> dict:"""Query accumulated failure patterns across all DAGs."""from stompy_hook import StompyHookhook = StompyHook()# Get all failures from last weekfailures = await hook.search_context(query="airflow failure last 7 days",limit=100)# Analyze patternsby_source = {}by_hour = {}by_day = {}for f in failures:content = f.get("content", "")# Extract and aggregate patterns...return {"total_failures": len(failures),"by_source": by_source,"by_hour": by_hour,"by_day": by_day,"recommendations": generate_recommendations(by_source, by_hour, by_day)}@taskasync def generate_report(analysis: dict):"""Generate and save intelligence report."""from stompy_hook import StompyHookhook = StompyHook()report = f"""Weekly Operational Intelligence ReportGenerated: {datetime.now().isoformat()}Total Failures: {analysis['total_failures']}Top Failure Sources: {analysis['by_source']}Peak Failure Hours: {analysis['by_hour']}Recommendations:{analysis['recommendations']}"""await hook.save_context(topic=f"weekly_ops_report_{datetime.now().strftime('%Y%W')}",content=report,tags=["airflow", "weekly-report", "operational-intelligence"])return reportanalysis = analyze_failure_patterns()generate_report(analysis)operational_intelligence_report()
What You Get
- DAGs that understand historical failure patterns before execution
- Operational wisdom accumulated across thousands of runs
- Semantic search for relevant historical context in any task
- Integration that leverages Airflow's mature provider ecosystem
- Production-grade memory for the production-grade orchestrator
Ready to give Apache Airflow a memory?
Join the waitlist and be the first to know when Stompy is ready. Your Apache Airflow projects will never forget again.