Skip to content

Set Up Nightly Full-Org Scans

This guide walks you through configuring scheduled scans that run every night across your entire GitHub organization, ensuring your security data stays current.


Goal

By the end of this guide, you will have:

  • An Airflow DAG that triggers nightly scans
  • Scans covering all synced repositories
  • Automatic ticket generation for new critical findings

Time: ~30 minutes


Prerequisites

  • A Mayo ASPM account with admin access
  • All target repositories synced via the GitHub App
  • An Apache Airflow instance (2.0+)
  • Basic Python knowledge

Step 1 — Create a dedicated API key

  1. Navigate to Settings > Integrations > API Keys.
  2. Click Create API Key.
  3. Configure:
    • Name: airflow-nightly-scans
    • Expiry: 1 year
    • Permissions: scans:read, scans:write, findings:read, tickets:write
  4. Click Create and copy the key.

Step 2 — Store the key in Airflow

airflow variables set MAYO_ASPM_API_KEY "mayo_ak_your_key_here"

Or use the Airflow UI: Admin > Variables > Add.

Danger

Never put the API key directly in your DAG file. Always use Airflow variables, connections, or an external secrets manager.


Step 3 — Create the DAG

Create dags/mayo_aspm_nightly_scan.py:

"""
Mayo ASPM Nightly Full-Organization Scan

Runs every night at 2 AM UTC:
1. Triggers a full org scan with SCA, SAST, and secret detection
2. Waits for the scan to complete
3. Checks for new critical findings
4. Generates tickets for critical/high findings
"""

from datetime import datetime, timedelta
import time
import requests
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

MAYO_API_BASE = "https://mayoaspm.com/api"
API_KEY = Variable.get("MAYO_ASPM_API_KEY")
HEADERS = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
}

default_args = {
    "owner": "security-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(hours=3),
}


def trigger_scan(**kwargs):
    """Trigger a full organization scan."""
    resp = requests.post(
        f"{MAYO_API_BASE}/scans",
        headers=HEADERS,
        json={
            "scope": "organization",
            "scanners": ["grype", "semgrep", "gitleaks"],
        },
        timeout=30,
    )
    resp.raise_for_status()
    scan_id = resp.json()["data"]["scan_id"]
    print(f"Scan triggered: {scan_id}")
    kwargs["ti"].xcom_push(key="scan_id", value=scan_id)


def wait_for_completion(**kwargs):
    """Poll until the scan finishes."""
    scan_id = kwargs["ti"].xcom_pull(key="scan_id")
    max_polls = 180  # 3 hours at 60s intervals
    for i in range(max_polls):
        resp = requests.get(
            f"{MAYO_API_BASE}/scans/{scan_id}",
            headers=HEADERS,
            timeout=30,
        )
        resp.raise_for_status()
        status = resp.json()["data"]["status"]
        print(f"Poll {i+1}: status={status}")

        if status == "completed":
            summary = resp.json()["data"].get("summary", {})
            kwargs["ti"].xcom_push(key="summary", value=summary)
            return

        if status == "failed":
            raise Exception(f"Scan {scan_id} failed")

        time.sleep(60)

    raise Exception(f"Scan {scan_id} timed out after {max_polls} minutes")


def check_and_alert(**kwargs):
    """Check scan results and print summary."""
    summary = kwargs["ti"].xcom_pull(key="summary")
    by_sev = summary.get("by_severity", {})

    print("=== Scan Summary ===")
    print(f"  Assets scanned: {summary.get('assets_scanned', 'N/A')}")
    print(f"  Total findings: {summary.get('total_findings', 'N/A')}")
    print(f"  New findings:   {summary.get('new_findings', 'N/A')}")
    print(f"  Critical: {by_sev.get('critical', 0)}")
    print(f"  High:     {by_sev.get('high', 0)}")
    print(f"  Medium:   {by_sev.get('medium', 0)}")
    print(f"  Low:      {by_sev.get('low', 0)}")

    new_critical = by_sev.get("critical", 0)
    if new_critical > 0:
        print(f"ALERT: {new_critical} critical findings detected!")


def generate_tickets(**kwargs):
    """Generate tickets for new critical and high findings."""
    resp = requests.post(
        f"{MAYO_API_BASE}/tickets/generate",
        headers=HEADERS,
        json={
            "filters": {
                "severity": ["critical", "high"],
                "status": ["triaged", "confirmed"],
                "age_max": 1,  # only findings from the last day
            },
            "grouping": "by_vulnerability",
            "delivery": "push",
        },
        timeout=60,
    )
    resp.raise_for_status()
    result = resp.json()["data"]
    print(f"Generated {result.get('ticket_count', 0)} tickets")


with DAG(
    "mayo_aspm_nightly_scan",
    default_args=default_args,
    description="Nightly full-org security scan with ticket generation",
    schedule_interval="0 2 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["security", "mayo-aspm"],
) as dag:
    t_trigger = PythonOperator(
        task_id="trigger_scan",
        python_callable=trigger_scan,
    )
    t_wait = PythonOperator(
        task_id="wait_for_completion",
        python_callable=wait_for_completion,
    )
    t_check = PythonOperator(
        task_id="check_and_alert",
        python_callable=check_and_alert,
    )
    t_tickets = PythonOperator(
        task_id="generate_tickets",
        python_callable=generate_tickets,
    )

    t_trigger >> t_wait >> t_check >> t_tickets

Step 4 — Deploy the DAG

  1. Copy the DAG file to your Airflow dags/ directory.
  2. Wait for Airflow to pick it up (check DAGs in the Airflow UI).
  3. Toggle the DAG on.

Step 5 — Test with a manual run

  1. In the Airflow UI, click the play button on the DAG to trigger a manual run.
  2. Monitor each task:
    • trigger_scan — should complete in seconds
    • wait_for_completion — watch logs for poll updates
    • check_and_alert — shows scan summary
    • generate_tickets — reports ticket count
  3. Verify in Mayo ASPM:
    • Scans page shows the triggered scan
    • Findings are updated
    • Tickets are created for critical/high findings

Step 6 — Set up alerting

Configure Airflow email or Slack alerts for DAG failures:

default_args = {
    "owner": "security-team",
    "email": ["security@example.com"],
    "email_on_failure": True,
    "email_on_retry": False,
}

Verification

  • API key is created with correct permissions
  • Key is stored in Airflow variables (not in code)
  • DAG appears in Airflow UI
  • Manual run completes all 4 tasks successfully
  • Scan appears in Mayo ASPM scans view
  • Tickets are generated for critical/high findings
  • DAG is scheduled for nightly execution

Customization

Scan specific projects

Replace the trigger_scan function to target specific projects:

def trigger_scan(**kwargs):
    project_ids = ["proj_abc", "proj_def", "proj_ghi"]
    scan_ids = []
    for pid in project_ids:
        resp = requests.post(
            f"{MAYO_API_BASE}/scans",
            headers=HEADERS,
            json={"scope": "project", "project_id": pid, "scanners": ["grype"]},
        )
        resp.raise_for_status()
        scan_ids.append(resp.json()["data"]["scan_id"])
    kwargs["ti"].xcom_push(key="scan_ids", value=scan_ids)

Weekly full scan + daily SCA

Create two DAGs:

  • mayo_aspm_daily_sca — runs Grype daily (fast)
  • mayo_aspm_weekly_full — runs all scanners weekly (thorough)

Next steps