Files
moo b027d6dcc0 + added raspi 2 (32bit arm)
+ more resilience for stats servers
2026-04-26 16:25:56 +02:00

250 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import socket
import os
import json
from pathlib import Path
import tempfile
import threading
import asyncio
import sys
import logging
from http.server import BaseHTTPRequestHandler, HTTPServer
import websockets
from websockets.exceptions import ConnectionClosed
# ----------------------------------------------------------------------
# Configuration
# ----------------------------------------------------------------------
UDP_ADDR = os.getenv('env_udp_addr', '0.0.0.0')
UDP_PORT = int(os.getenv('env_udp_port', '5005'))
HTTP_ADDR = os.getenv('env_http_addr', '0.0.0.0')
HTTP_PORT = int(os.getenv('env_http_port', '8080'))
WS_PORT = int(os.getenv('env_ws_port', '8081'))
LOGFILE_PATH = os.getenv('env_logfile_path', 'logfile.json')
# Maximum UDP datagram size (IPv4)
UDP_MAX_SIZE = 65535
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# ----------------------------------------------------------------------
# Threadsafe cache for latest stats
# ----------------------------------------------------------------------
class StatsCache:
def __init__(self, initial_data='{"receiver-stats":null}'):
self._lock = threading.Lock()
self._data = initial_data
self._loop = None
self._update_callback = None
def set_loop(self, loop):
self._loop = loop
def set_update_callback(self, callback):
self._update_callback = callback
def update(self, new_data: str):
with self._lock:
self._data = new_data
if self._loop and self._update_callback:
asyncio.run_coroutine_threadsafe(self._update_callback(new_data), self._loop)
def get(self) -> str:
with self._lock:
return self._data
# Global cache instance
cache = StatsCache()
# ----------------------------------------------------------------------
# Atomic file write utility
# ----------------------------------------------------------------------
def atomic_write(content: str, file_path: str):
"""Write content to file_path atomically using a temp file + rename."""
try:
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(file_path),
delete=False, suffix='.tmp', encoding='utf-8') as tmp:
tmp.write(content)
temp_path = tmp.name
os.replace(temp_path, file_path)
except Exception as e:
logging.error(f"Atomic write failed: {e}")
try:
if 'temp_path' in locals() and os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass
# ----------------------------------------------------------------------
# UDP Listener Thread
# ----------------------------------------------------------------------
def udp_listener():
logging.info(f'Starting UDP listener on {UDP_ADDR}:{UDP_PORT}')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_ADDR, UDP_PORT))
sock.settimeout(1.0)
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
try:
while True:
try:
data, addr = sock.recvfrom(UDP_MAX_SIZE)
if len(data) == UDP_MAX_SIZE:
logging.warning(f"Max size datagram received from {addr}; possible truncation.")
decoded = data.decode('utf-8')
parts = decoded.split('|')
if len(parts) < 3:
continue
relevant = parts[2].strip().replace("[INFO] ", "")
if "receiver-stats" in relevant:
try:
json_data = json.loads(relevant)
new_json_str = json.dumps(json_data)
cache.update(new_json_str)
atomic_write(new_json_str, LOGFILE_PATH)
except json.JSONDecodeError:
logging.info(f"Skipping invalid JSON: {relevant[:100]}...")
elif "[CLEANUP]" in relevant or "has timed out" in relevant:
null_json = '{"receiver-stats":null}'
cache.update(null_json)
atomic_write(null_json, LOGFILE_PATH)
except socket.timeout:
continue
except Exception as e:
logging.error(f"UDP loop error: {e}", exc_info=True)
except Exception as e:
logging.critical(f"UDP thread fatal error: {e}", exc_info=True)
finally:
sock.close()
logging.info("UDP listener stopped.")
# ----------------------------------------------------------------------
# HTTP Server
# ----------------------------------------------------------------------
class StatsHTTPHandler(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
def log_message(self, format, *args):
return # suppress default logging
def do_HEAD(self):
self._set_headers()
def do_GET(self):
try:
self._set_headers()
json_str = cache.get()
self.wfile.write(json_str.encode('utf-8'))
except (BrokenPipeError, ConnectionResetError):
pass
except Exception as e:
logging.error(f"HTTP handler error: {e}")
def run_http_server():
server_address = (HTTP_ADDR, HTTP_PORT)
httpd = HTTPServer(server_address, StatsHTTPHandler)
logging.info(f'HTTP server started on http://{HTTP_ADDR}:{HTTP_PORT}')
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
except Exception as e:
logging.error(f"HTTP server error: {e}")
finally:
httpd.server_close()
logging.info('HTTP server stopped.')
# ----------------------------------------------------------------------
# WebSocket Server
# ----------------------------------------------------------------------
class WebSocketManager:
def __init__(self):
self.clients = set()
self._lock = asyncio.Lock()
async def register(self, websocket):
async with self._lock:
self.clients.add(websocket)
async def unregister(self, websocket):
async with self._lock:
self.clients.discard(websocket)
async def broadcast(self, message: str):
async with self._lock:
clients = list(self.clients)
for ws in clients:
try:
await ws.send(message)
except Exception:
pass
ws_manager = WebSocketManager()
# FIXED: Removed the 'path' argument for compatibility with websockets ≥ 11.0
async def websocket_handler(websocket):
"""Handle a single WebSocket connection."""
await ws_manager.register(websocket)
try:
# Send current state immediately
current = cache.get()
await websocket.send(current)
# Keep connection alive until client disconnects
await websocket.wait_closed()
except ConnectionClosed:
pass
finally:
await ws_manager.unregister(websocket)
async def on_stats_updated(new_json: str):
await ws_manager.broadcast(new_json)
# ----------------------------------------------------------------------
# Main
# ----------------------------------------------------------------------
async def main_async():
loop = asyncio.get_running_loop()
cache.set_loop(loop)
cache.set_update_callback(on_stats_updated)
udp_thread = threading.Thread(target=udp_listener, daemon=True)
udp_thread.start()
http_thread = threading.Thread(target=run_http_server, daemon=True)
http_thread.start()
async with websockets.serve(
websocket_handler,
HTTP_ADDR,
WS_PORT,
ping_interval=20,
ping_timeout=20,
close_timeout=10,
):
logging.info(f'WebSocket server started on ws://{HTTP_ADDR}:{WS_PORT}')
await asyncio.Future() # run forever
if __name__ == "__main__":
banner_path = Path("banner.txt")
if banner_path.exists():
print(banner_path.read_text())
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
try:
asyncio.run(main_async())
except KeyboardInterrupt:
logging.info("Shutting down...")
except Exception as e:
logging.critical(f"Fatal error: {e}", exc_info=True)
finally:
sys.exit(0)