Set Up Nightly Full-Org Scans¶
This guide walks you through configuring scheduled scans that run every night across your entire GitHub organization, ensuring your security data stays current.
Goal¶
By the end of this guide, you will have:
- An Airflow DAG that triggers nightly scans
- Scans covering all synced repositories
- Automatic ticket generation for new critical findings
Time: ~30 minutes
Prerequisites¶
- A Mayo ASPM account with admin access
- All target repositories synced via the GitHub App
- An Apache Airflow instance (2.0+)
- Basic Python knowledge
Step 1 — Create a dedicated API key¶
- Navigate to Settings > Integrations > API Keys.
- Click Create API Key.
- Configure:
- Name:
airflow-nightly-scans - Expiry: 1 year
- Permissions:
scans:read,scans:write,findings:read,tickets:write
- Name:
- Click Create and copy the key.
Step 2 — Store the key in Airflow¶
Or use the Airflow UI: Admin > Variables > Add.
Danger
Never put the API key directly in your DAG file. Always use Airflow variables, connections, or an external secrets manager.
Step 3 — Create the DAG¶
Create dags/mayo_aspm_nightly_scan.py:
"""
Mayo ASPM Nightly Full-Organization Scan
Runs every night at 2 AM UTC:
1. Triggers a full org scan with SCA, SAST, and secret detection
2. Waits for the scan to complete
3. Checks for new critical findings
4. Generates tickets for critical/high findings
"""
from datetime import datetime, timedelta
import time
import requests
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
MAYO_API_BASE = "https://mayoaspm.com/api"
API_KEY = Variable.get("MAYO_ASPM_API_KEY")
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
default_args = {
"owner": "security-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=3),
}
def trigger_scan(**kwargs):
"""Trigger a full organization scan."""
resp = requests.post(
f"{MAYO_API_BASE}/scans",
headers=HEADERS,
json={
"scope": "organization",
"scanners": ["grype", "semgrep", "gitleaks"],
},
timeout=30,
)
resp.raise_for_status()
scan_id = resp.json()["data"]["scan_id"]
print(f"Scan triggered: {scan_id}")
kwargs["ti"].xcom_push(key="scan_id", value=scan_id)
def wait_for_completion(**kwargs):
"""Poll until the scan finishes."""
scan_id = kwargs["ti"].xcom_pull(key="scan_id")
max_polls = 180 # 3 hours at 60s intervals
for i in range(max_polls):
resp = requests.get(
f"{MAYO_API_BASE}/scans/{scan_id}",
headers=HEADERS,
timeout=30,
)
resp.raise_for_status()
status = resp.json()["data"]["status"]
print(f"Poll {i+1}: status={status}")
if status == "completed":
summary = resp.json()["data"].get("summary", {})
kwargs["ti"].xcom_push(key="summary", value=summary)
return
if status == "failed":
raise Exception(f"Scan {scan_id} failed")
time.sleep(60)
raise Exception(f"Scan {scan_id} timed out after {max_polls} minutes")
def check_and_alert(**kwargs):
"""Check scan results and print summary."""
summary = kwargs["ti"].xcom_pull(key="summary")
by_sev = summary.get("by_severity", {})
print("=== Scan Summary ===")
print(f" Assets scanned: {summary.get('assets_scanned', 'N/A')}")
print(f" Total findings: {summary.get('total_findings', 'N/A')}")
print(f" New findings: {summary.get('new_findings', 'N/A')}")
print(f" Critical: {by_sev.get('critical', 0)}")
print(f" High: {by_sev.get('high', 0)}")
print(f" Medium: {by_sev.get('medium', 0)}")
print(f" Low: {by_sev.get('low', 0)}")
new_critical = by_sev.get("critical", 0)
if new_critical > 0:
print(f"ALERT: {new_critical} critical findings detected!")
def generate_tickets(**kwargs):
"""Generate tickets for new critical and high findings."""
resp = requests.post(
f"{MAYO_API_BASE}/tickets/generate",
headers=HEADERS,
json={
"filters": {
"severity": ["critical", "high"],
"status": ["triaged", "confirmed"],
"age_max": 1, # only findings from the last day
},
"grouping": "by_vulnerability",
"delivery": "push",
},
timeout=60,
)
resp.raise_for_status()
result = resp.json()["data"]
print(f"Generated {result.get('ticket_count', 0)} tickets")
with DAG(
"mayo_aspm_nightly_scan",
default_args=default_args,
description="Nightly full-org security scan with ticket generation",
schedule_interval="0 2 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["security", "mayo-aspm"],
) as dag:
t_trigger = PythonOperator(
task_id="trigger_scan",
python_callable=trigger_scan,
)
t_wait = PythonOperator(
task_id="wait_for_completion",
python_callable=wait_for_completion,
)
t_check = PythonOperator(
task_id="check_and_alert",
python_callable=check_and_alert,
)
t_tickets = PythonOperator(
task_id="generate_tickets",
python_callable=generate_tickets,
)
t_trigger >> t_wait >> t_check >> t_tickets
Step 4 — Deploy the DAG¶
- Copy the DAG file to your Airflow
dags/directory. - Wait for Airflow to pick it up (check DAGs in the Airflow UI).
- Toggle the DAG on.
Step 5 — Test with a manual run¶
- In the Airflow UI, click the play button on the DAG to trigger a manual run.
- Monitor each task:
trigger_scan— should complete in secondswait_for_completion— watch logs for poll updatescheck_and_alert— shows scan summarygenerate_tickets— reports ticket count
- Verify in Mayo ASPM:
- Scans page shows the triggered scan
- Findings are updated
- Tickets are created for critical/high findings
Step 6 — Set up alerting¶
Configure Airflow email or Slack alerts for DAG failures:
default_args = {
"owner": "security-team",
"email": ["security@example.com"],
"email_on_failure": True,
"email_on_retry": False,
}
Verification¶
- API key is created with correct permissions
- Key is stored in Airflow variables (not in code)
- DAG appears in Airflow UI
- Manual run completes all 4 tasks successfully
- Scan appears in Mayo ASPM scans view
- Tickets are generated for critical/high findings
- DAG is scheduled for nightly execution
Customization¶
Scan specific projects¶
Replace the trigger_scan function to target specific projects:
def trigger_scan(**kwargs):
project_ids = ["proj_abc", "proj_def", "proj_ghi"]
scan_ids = []
for pid in project_ids:
resp = requests.post(
f"{MAYO_API_BASE}/scans",
headers=HEADERS,
json={"scope": "project", "project_id": pid, "scanners": ["grype"]},
)
resp.raise_for_status()
scan_ids.append(resp.json()["data"]["scan_id"])
kwargs["ti"].xcom_push(key="scan_ids", value=scan_ids)
Weekly full scan + daily SCA¶
Create two DAGs:
mayo_aspm_daily_sca— runs Grype daily (fast)mayo_aspm_weekly_full— runs all scanners weekly (thorough)
Next steps¶
- Airflow integration — detailed integration guide
- API reference — all API endpoints
- Scanner selection — choose the right scanners