aastatus-to-discord/aastatus_to_webhook.py

260 lines
7.8 KiB
Python

#!/usr/bin/env python3
import requests
import xml.etree.ElementTree as ET
import html
import json
import re
from pathlib import Path
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
FEED_URL = "https://aastatus.net/atom.cgi"
WEBHOOK_URL = "https://discord.com/api/webhooks/XXXXXXX" # Discord webhook URL
STATE_FILE = Path("/tmp/aaisp_atom_state.json")
# Event colors
COLOR_MINOR_OPEN = 0xFFFF00 # yellow
COLOR_MAJOR_OPEN = 0xFF0000 # red
COLOR_CLOSED = 0x2ECC71 # green
COLOR_UNKNOWN = 0x95A5A6 # grey
### STATE MANAGEMENT ###
def load_state():
if not STATE_FILE.exists():
return {}
try:
return json.loads(STATE_FILE.read_text())
except Exception:
logger.error("State file corrupted, resetting...")
return {}
def save_state(state):
try:
STATE_FILE.write_text(json.dumps(state, indent=2))
except Exception as e:
logger.error(f"State save failed: {e}")
### FEED PARSING ###
def fetch_feed(url):
try:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
return resp.text
except requests.RequestException as e:
logger.error(f"Error fetching feed: {e}")
raise
def parse_feed_entries(feed_xml):
"""Return a list of all valid entries in the feed."""
entries = []
try:
root = ET.fromstring(feed_xml)
atom_ns = "{http://www.w3.org/2005/Atom}"
for entry in root.findall(f"{atom_ns}entry"):
id_elem = entry.find(f"{atom_ns}id")
if id_elem is None:
continue
title_elem = entry.find(f"{atom_ns}title")
title_text = title_elem.text.strip() if title_elem is not None and title_elem.text else "No Title"
content_elem = entry.find(f"{atom_ns}content")
summary_elem = entry.find(f"{atom_ns}summary")
if content_elem is not None and content_elem.text and content_elem.text.strip():
content_text = content_elem.text.strip()
elif summary_elem is not None and summary_elem.text and summary_elem.text.strip():
content_text = summary_elem.text.strip()
else:
content_text = title_text
# Link
link_elem = entry.find(f"{atom_ns}link[@rel='alternate']")
if link_elem is not None:
link = link_elem.get("href", "")
else:
first_link = entry.find(f"{atom_ns}link")
link = first_link.get("href", "") if first_link is not None else ""
# Categories (severity, type, status)
severity = "Unknown"
categories = []
status = "Unknown"
for cat in entry.findall(f".//{atom_ns}category"):
label = cat.get("label", "") or cat.get("term", "")
scheme = cat.get("scheme", "")
if scheme == "https://aastatus.net/severity":
severity = label
elif scheme == "https://aastatus.net/type":
categories.append(label)
elif scheme == "https://aastatus.net/status":
status = label
updated = entry.findtext(f"{atom_ns}updated", "")
published = entry.findtext(f"{atom_ns}published", "")
entries.append({
"id": id_elem.text.strip(),
"title": html.unescape(title_text),
"link": link,
"updated": updated,
"published": published,
"categories": ",".join(categories),
"severity": severity,
"status": status,
"content": html.unescape(content_text)
})
return entries
except ET.ParseError as e:
logger.error(f"XML parsing error: {e}")
raise
### MARKDOWN CLEANER ###
def html_to_markdown(html_content):
md = html_content
md = re.sub(r"<br\s*/?>", "\n", md, flags=re.IGNORECASE)
md = re.sub(r"</?b>", "**", md, flags=re.IGNORECASE)
md = re.sub(r"</?p>", "\n", md, flags=re.IGNORECASE)
md = re.sub(r"<[^>]+>", "", md)
return md.strip()[:3500]
### COLOR LOGIC (table-driven) ###
def get_event_color(status, severity):
SPECIAL_COLORS = {
("PEW", "Open"): 0x51D3D4,
("PEW", "Planned"): 0x51D3D4,
}
key = (severity, status)
if key in SPECIAL_COLORS:
return SPECIAL_COLORS[key]
if status == "Closed":
return COLOR_CLOSED
STATUS_SEVERITY_COLORS = {
("Open", "Minor"): COLOR_MINOR_OPEN,
("Open", "Major"): COLOR_MAJOR_OPEN,
("Open", "MSO"): COLOR_MAJOR_OPEN,
}
return STATUS_SEVERITY_COLORS.get((status, severity), COLOR_UNKNOWN)
### DISCORD FORMAT ###
def build_discord_payload(entry, change_type="New entry"):
content_md = html_to_markdown(entry["content"])
try:
updated_ts = datetime.fromisoformat(entry["updated"].replace("Z", "+00:00")).isoformat()
except Exception:
updated_ts = entry["updated"]
color = get_event_color(entry["status"], entry["severity"])
embed = {
"title": f"{entry['title']} ({change_type})",
"url": entry["link"],
"description": content_md,
"timestamp": updated_ts,
"color": color,
"fields": [
{"name": "Published", "value": entry["published"], "inline": True},
{"name": "Severity", "value": entry["severity"], "inline": True},
{"name": "Status", "value": entry["status"], "inline": True},
{"name": "Categories", "value": entry["categories"], "inline": False},
]
}
return {"embeds": [embed]}
def post_to_discord(webhook_url, payload):
if not webhook_url:
logger.error("Discord webhook URL is not set!")
return
try:
resp = requests.post(webhook_url, json=payload)
resp.raise_for_status()
logger.info("Posted to Discord")
except requests.RequestException as e:
logger.error(f"Discord post failed: {e}")
### MAIN LOGIC ###
def main():
logger.info("[*] Fetching feed...")
feed_xml = fetch_feed(FEED_URL)
logger.info("[*] Parsing feed entries...")
entries = parse_feed_entries(feed_xml)
if not entries:
logger.warning("No entries found")
return
state = load_state()
for entry in entries:
incident_id = entry["id"]
prev = state.get(incident_id)
must_post = False
change_type = "New entry"
if prev is None:
must_post = True
change_type = "New Incident Detected"
else:
if entry["status"] != prev.get("status"):
must_post = True
change_type = f"Status changed: {prev.get('status')}{entry['status']}"
elif entry["severity"] != prev.get("severity"):
must_post = True
change_type = f"Severity changed: {prev.get('severity')}{entry['severity']}"
elif entry["updated"] != prev.get("updated"):
must_post = True
change_type = "Feed updated"
elif entry["content"] != prev.get("content"):
must_post = True
change_type = "Content updated"
if must_post:
logger.info(f"[+] Posting update: {change_type} ({entry['title']})")
payload = build_discord_payload(entry, change_type)
post_to_discord(WEBHOOK_URL, payload)
# Save state
state[incident_id] = {
"status": entry["status"],
"severity": entry["severity"],
"updated": entry["updated"],
"content": entry["content"]
}
save_state(state)
if __name__ == "__main__":
main()