Skip to content

Airflow Integration

Use Apache Airflow to schedule and orchestrate Mayo ASPM scans. This integration lets you run nightly full-org scans, periodic compliance checks, and custom scan workflows.


Overview

Airflow communicates with Mayo ASPM through the REST API. You create an Airflow DAG that:

  1. Triggers scans via the Mayo ASPM API
  2. Polls for scan completion
  3. Optionally processes results (e.g., trigger ticket generation, send notifications)
Airflow                         Mayo ASPM
───────                         ─────────
DAG triggers     ── POST ──▶    Start scan
Poll status      ── GET ──▶     Check scan status
                 ◀── 200 ──     Scan complete
Process results  ── GET ──▶     Fetch findings

Prerequisites

  • An Airflow instance (2.0+ recommended)
  • A Mayo ASPM API key with scans:read and scans:write permissions
  • The requests Python library (included in Airflow by default)

Setup

Step 1 — Create an API key

  1. In Mayo ASPM, go to Settings > Integrations > API Keys.
  2. Create a key with permissions: scans:read, scans:write, findings:read.
  3. Name it airflow-prod (or similar).
  4. Copy the key.

Step 2 — Store the key in Airflow

Add the API key as an Airflow connection or variable:

# Using Airflow CLI
airflow variables set MAYO_ASPM_API_KEY "mayo_ak_..."

Or use the Airflow UI: Admin > Variables > Add.

Warning

Never hardcode the API key in your DAG file. Always use Airflow variables or connections.

Step 3 — Create the DAG

Create a DAG file in your Airflow dags/ directory:

from datetime import datetime, timedelta
import requests
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

MAYO_API_BASE = "https://mayoaspm.com/api"
API_KEY = Variable.get("MAYO_ASPM_API_KEY")

HEADERS = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
}

default_args = {
    "owner": "security-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

def trigger_org_scan(**kwargs):
    """Trigger a full organization scan."""
    response = requests.post(
        f"{MAYO_API_BASE}/scans",
        headers=HEADERS,
        json={
            "scope": "organization",
            "scanners": ["grype", "semgrep", "gitleaks"],
        },
    )
    response.raise_for_status()
    scan_id = response.json()["scan_id"]
    kwargs["ti"].xcom_push(key="scan_id", value=scan_id)
    return scan_id

def wait_for_scan(**kwargs):
    """Poll until the scan completes."""
    scan_id = kwargs["ti"].xcom_pull(key="scan_id")
    import time

    for _ in range(120):  # max 2 hours
        response = requests.get(
            f"{MAYO_API_BASE}/scans/{scan_id}",
            headers=HEADERS,
        )
        response.raise_for_status()
        status = response.json()["status"]
        if status == "completed":
            return True
        if status == "failed":
            raise Exception(f"Scan {scan_id} failed")
        time.sleep(60)
    raise Exception(f"Scan {scan_id} timed out")

def check_results(**kwargs):
    """Check scan results and alert on critical findings."""
    scan_id = kwargs["ti"].xcom_pull(key="scan_id")
    response = requests.get(
        f"{MAYO_API_BASE}/scans/{scan_id}/summary",
        headers=HEADERS,
    )
    response.raise_for_status()
    summary = response.json()
    critical = summary["by_severity"]["critical"]
    if critical > 0:
        print(f"ALERT: {critical} critical findings detected!")

with DAG(
    "mayo_aspm_nightly_scan",
    default_args=default_args,
    description="Nightly full-org security scan",
    schedule_interval="0 2 * * *",  # 2 AM daily
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["security", "mayo-aspm"],
) as dag:
    trigger = PythonOperator(
        task_id="trigger_org_scan",
        python_callable=trigger_org_scan,
    )
    wait = PythonOperator(
        task_id="wait_for_scan",
        python_callable=wait_for_scan,
    )
    check = PythonOperator(
        task_id="check_results",
        python_callable=check_results,
    )
    trigger >> wait >> check

Common DAG patterns

Scan specific projects

def trigger_project_scan(project_id, **kwargs):
    response = requests.post(
        f"{MAYO_API_BASE}/scans",
        headers=HEADERS,
        json={
            "scope": "project",
            "project_id": project_id,
            "scanners": ["grype", "trivy"],
        },
    )
    response.raise_for_status()
    return response.json()["scan_id"]

Generate tickets after scan

def generate_tickets(project_id, **kwargs):
    response = requests.post(
        f"{MAYO_API_BASE}/tickets/generate",
        headers=HEADERS,
        json={
            "project_id": project_id,
            "filters": {"severity": ["critical", "high"], "status": ["triaged"]},
            "grouping": "by_vulnerability",
            "delivery": "push",
        },
    )
    response.raise_for_status()
    return response.json()["ticket_count"]

Monitoring

  • Use Airflow's built-in alerting to notify on DAG failures.
  • Check the Scans page in Mayo ASPM to verify scans triggered by Airflow.
  • Monitor API key usage in Settings > Integrations > API Keys.

Next steps