266 lines
7.9 KiB
Python
266 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
import requests
|
|
import xml.etree.ElementTree as ET
|
|
import html
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FEED_URL = "https://aastatus.net/atom.cgi"
|
|
WEBHOOK_URL = "https://discord.com/api/webhooks/XXXXXXX" # Discord webhook URL
|
|
STATE_FILE = Path("/tmp/aaisp_atom_state.json")
|
|
|
|
# Event colors (default fallbacks)
|
|
COLOR_MINOR_OPEN = 0xFFFF00 # yellow
|
|
COLOR_MAJOR_OPEN = 0xFF0000 # red
|
|
COLOR_CLOSED = 0x2ECC71 # green
|
|
COLOR_UNKNOWN = 0x95A5A6 # grey
|
|
|
|
|
|
### STATE MANAGEMENT ###
|
|
|
|
def load_state():
|
|
if not STATE_FILE.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(STATE_FILE.read_text())
|
|
except Exception:
|
|
logger.error("State file corrupted, resetting...")
|
|
return {}
|
|
|
|
def save_state(state):
|
|
try:
|
|
STATE_FILE.write_text(json.dumps(state, indent=2))
|
|
except Exception as e:
|
|
logger.error(f"State save failed: {e}")
|
|
|
|
|
|
### FEED PARSING ###
|
|
|
|
def fetch_feed(url):
|
|
try:
|
|
resp = requests.get(url, timeout=10)
|
|
resp.raise_for_status()
|
|
return resp.text
|
|
except requests.RequestException as e:
|
|
logger.error(f"Error fetching feed: {e}")
|
|
raise
|
|
|
|
|
|
def get_first_valid_entry(feed_xml):
|
|
try:
|
|
root = ET.fromstring(feed_xml)
|
|
atom_ns = "{http://www.w3.org/2005/Atom}"
|
|
|
|
for entry in root.findall(f"{atom_ns}entry"):
|
|
id_elem = entry.find(f"{atom_ns}id")
|
|
if id_elem is None:
|
|
continue
|
|
|
|
title_elem = entry.find(f"{atom_ns}title")
|
|
title_text = title_elem.text.strip() if title_elem is not None and title_elem.text else "No Title"
|
|
|
|
content_elem = entry.find(f"{atom_ns}content")
|
|
summary_elem = entry.find(f"{atom_ns}summary")
|
|
|
|
if content_elem is not None and content_elem.text and content_elem.text.strip():
|
|
content_text = content_elem.text.strip()
|
|
elif summary_elem is not None and summary_elem.text and summary_elem.text.strip():
|
|
content_text = summary_elem.text.strip()
|
|
else:
|
|
content_text = title_text
|
|
|
|
# Link
|
|
link_elem = entry.find(f"{atom_ns}link[@rel='alternate']")
|
|
if link_elem is not None:
|
|
link = link_elem.get("href", "")
|
|
else:
|
|
first_link = entry.find(f"{atom_ns}link")
|
|
link = first_link.get("href", "") if first_link is not None else ""
|
|
|
|
# Categories (severity, type, status)
|
|
severity = "Unknown"
|
|
categories = []
|
|
status = "Unknown"
|
|
|
|
for cat in entry.findall(f".//{atom_ns}category"):
|
|
label = cat.get("label", "") or cat.get("term", "")
|
|
scheme = cat.get("scheme", "")
|
|
|
|
if scheme == "https://aastatus.net/severity":
|
|
severity = label
|
|
elif scheme == "https://aastatus.net/type":
|
|
categories.append(label)
|
|
elif scheme == "https://aastatus.net/status":
|
|
status = label
|
|
|
|
updated = entry.findtext(f"{atom_ns}updated", "")
|
|
published = entry.findtext(f"{atom_ns}published", "")
|
|
|
|
return {
|
|
"id": id_elem.text.strip(),
|
|
"title": html.unescape(title_text),
|
|
"link": link,
|
|
"updated": updated,
|
|
"published": published,
|
|
"categories": ",".join(categories),
|
|
"severity": severity,
|
|
"status": status,
|
|
"content": html.unescape(content_text)
|
|
}
|
|
|
|
return None
|
|
|
|
except ET.ParseError as e:
|
|
logger.error(f"XML parsing error: {e}")
|
|
raise
|
|
|
|
|
|
### MARKDOWN CLEANER ###
|
|
|
|
def html_to_markdown(html_content):
|
|
md = html_content
|
|
md = re.sub(r"<br\s*/?>", "\n", md, flags=re.IGNORECASE)
|
|
md = re.sub(r"</?b>", "**", md, flags=re.IGNORECASE)
|
|
md = re.sub(r"</?p>", "\n", md, flags=re.IGNORECASE)
|
|
md = re.sub(r"<[^>]+>", "", md)
|
|
return md.strip()[:3500]
|
|
|
|
|
|
### COLOR LOGIC (refactored, table-driven) ###
|
|
|
|
def get_event_color(status, severity):
|
|
"""
|
|
Determines the Discord embed colour based on status & severity.
|
|
Uses a lookup table for clarity and easy extensions.
|
|
"""
|
|
|
|
# Special case colours for Planned Maintenance (PEW)
|
|
SPECIAL_COLORS = {
|
|
("PEW", "Open"): 0x51D3D4,
|
|
("PEW", "Planned"): 0x51D3D4,
|
|
}
|
|
|
|
# Direct match first
|
|
key = (severity, status)
|
|
if key in SPECIAL_COLORS:
|
|
return SPECIAL_COLORS[key]
|
|
|
|
# Normal rules
|
|
if status == "Closed":
|
|
return COLOR_CLOSED
|
|
|
|
STATUS_SEVERITY_COLORS = {
|
|
("Open", "Minor"): COLOR_MINOR_OPEN,
|
|
("Open", "Major"): COLOR_MAJOR_OPEN,
|
|
("Open", "MSO"): COLOR_MAJOR_OPEN,
|
|
}
|
|
|
|
return STATUS_SEVERITY_COLORS.get((status, severity), COLOR_UNKNOWN)
|
|
|
|
|
|
### DISCORD FORMAT ###
|
|
|
|
def build_discord_payload(entry, change_type="New entry"):
|
|
content_md = html_to_markdown(entry["content"])
|
|
|
|
try:
|
|
updated_ts = datetime.fromisoformat(entry["updated"].replace("Z", "+00:00")).isoformat()
|
|
except Exception:
|
|
updated_ts = entry["updated"]
|
|
|
|
color = get_event_color(entry["status"], entry["severity"])
|
|
|
|
embed = {
|
|
"title": f"{entry['title']} ({change_type})",
|
|
"url": entry["link"],
|
|
"description": content_md,
|
|
"timestamp": updated_ts,
|
|
"color": color,
|
|
"fields": [
|
|
{"name": "Published", "value": entry["published"], "inline": True},
|
|
{"name": "Severity", "value": entry["severity"], "inline": True},
|
|
{"name": "Status", "value": entry["status"], "inline": True},
|
|
{"name": "Categories", "value": entry["categories"], "inline": False},
|
|
]
|
|
}
|
|
return {"embeds": [embed]}
|
|
|
|
|
|
def post_to_discord(webhook_url, payload):
|
|
if not webhook_url:
|
|
logger.error("Discord webhook URL is not set!")
|
|
return
|
|
try:
|
|
resp = requests.post(webhook_url, json=payload)
|
|
resp.raise_for_status()
|
|
logger.info("Posted to Discord")
|
|
except requests.RequestException as e:
|
|
logger.error(f"Discord post failed: {e}")
|
|
|
|
|
|
### MAIN LOGIC ###
|
|
|
|
def main():
|
|
logger.info("[*] Fetching feed...")
|
|
feed_xml = fetch_feed(FEED_URL)
|
|
|
|
logger.info("[*] Parsing...")
|
|
entry = get_first_valid_entry(feed_xml)
|
|
if not entry:
|
|
logger.warning("No entry found")
|
|
return
|
|
|
|
incident_id = entry["id"]
|
|
state = load_state()
|
|
|
|
prev = state.get(incident_id)
|
|
|
|
# Determine if an update is needed
|
|
must_post = False
|
|
change_type = "New entry"
|
|
|
|
if prev is None:
|
|
must_post = True
|
|
change_type = "New Incident Detected"
|
|
else:
|
|
# Compare important fields
|
|
if entry["status"] != prev.get("status"):
|
|
must_post = True
|
|
change_type = f"Status changed: {prev.get('status')} → {entry['status']}"
|
|
elif entry["severity"] != prev.get("severity"):
|
|
must_post = True
|
|
change_type = f"Severity changed: {prev.get('severity']} → {entry['severity']}"
|
|
elif entry["updated"] != prev.get("updated"):
|
|
must_post = True
|
|
change_type = "Feed updated"
|
|
elif entry["content"] != prev.get("content"):
|
|
must_post = True
|
|
change_type = "Content updated"
|
|
|
|
if must_post:
|
|
logger.info(f"[+] Posting update: {change_type}")
|
|
payload = build_discord_payload(entry, change_type)
|
|
post_to_discord(WEBHOOK_URL, payload)
|
|
|
|
# Save state no matter what
|
|
state[incident_id] = {
|
|
"status": entry["status"],
|
|
"severity": entry["severity"],
|
|
"updated": entry["updated"],
|
|
"content": entry["content"]
|
|
}
|
|
save_state(state)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|