b027d6dcc0
+ more resilience for stats servers
250 lines
8.4 KiB
Python
250 lines
8.4 KiB
Python
import socket
|
||
import os
|
||
import json
|
||
from pathlib import Path
|
||
import tempfile
|
||
import threading
|
||
import asyncio
|
||
import sys
|
||
import logging
|
||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||
import websockets
|
||
from websockets.exceptions import ConnectionClosed
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Configuration
|
||
# ----------------------------------------------------------------------
|
||
UDP_ADDR = os.getenv('env_udp_addr', '0.0.0.0')
|
||
UDP_PORT = int(os.getenv('env_udp_port', '5005'))
|
||
HTTP_ADDR = os.getenv('env_http_addr', '0.0.0.0')
|
||
HTTP_PORT = int(os.getenv('env_http_port', '8080'))
|
||
WS_PORT = int(os.getenv('env_ws_port', '8081'))
|
||
LOGFILE_PATH = os.getenv('env_logfile_path', 'logfile.json')
|
||
|
||
# Maximum UDP datagram size (IPv4)
|
||
UDP_MAX_SIZE = 65535
|
||
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Thread‑safe cache for latest stats
|
||
# ----------------------------------------------------------------------
|
||
class StatsCache:
|
||
def __init__(self, initial_data='{"receiver-stats":null}'):
|
||
self._lock = threading.Lock()
|
||
self._data = initial_data
|
||
self._loop = None
|
||
self._update_callback = None
|
||
|
||
def set_loop(self, loop):
|
||
self._loop = loop
|
||
|
||
def set_update_callback(self, callback):
|
||
self._update_callback = callback
|
||
|
||
def update(self, new_data: str):
|
||
with self._lock:
|
||
self._data = new_data
|
||
if self._loop and self._update_callback:
|
||
asyncio.run_coroutine_threadsafe(self._update_callback(new_data), self._loop)
|
||
|
||
def get(self) -> str:
|
||
with self._lock:
|
||
return self._data
|
||
|
||
# Global cache instance
|
||
cache = StatsCache()
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Atomic file write utility
|
||
# ----------------------------------------------------------------------
|
||
def atomic_write(content: str, file_path: str):
|
||
"""Write content to file_path atomically using a temp file + rename."""
|
||
try:
|
||
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
||
with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(file_path),
|
||
delete=False, suffix='.tmp', encoding='utf-8') as tmp:
|
||
tmp.write(content)
|
||
temp_path = tmp.name
|
||
os.replace(temp_path, file_path)
|
||
except Exception as e:
|
||
logging.error(f"Atomic write failed: {e}")
|
||
try:
|
||
if 'temp_path' in locals() and os.path.exists(temp_path):
|
||
os.unlink(temp_path)
|
||
except Exception:
|
||
pass
|
||
|
||
# ----------------------------------------------------------------------
|
||
# UDP Listener Thread
|
||
# ----------------------------------------------------------------------
|
||
def udp_listener():
|
||
logging.info(f'Starting UDP listener on {UDP_ADDR}:{UDP_PORT}')
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.bind((UDP_ADDR, UDP_PORT))
|
||
sock.settimeout(1.0)
|
||
|
||
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
|
||
|
||
try:
|
||
while True:
|
||
try:
|
||
data, addr = sock.recvfrom(UDP_MAX_SIZE)
|
||
if len(data) == UDP_MAX_SIZE:
|
||
logging.warning(f"Max size datagram received from {addr}; possible truncation.")
|
||
|
||
decoded = data.decode('utf-8')
|
||
parts = decoded.split('|')
|
||
if len(parts) < 3:
|
||
continue
|
||
|
||
relevant = parts[2].strip().replace("[INFO] ", "")
|
||
|
||
if "receiver-stats" in relevant:
|
||
try:
|
||
json_data = json.loads(relevant)
|
||
new_json_str = json.dumps(json_data)
|
||
cache.update(new_json_str)
|
||
atomic_write(new_json_str, LOGFILE_PATH)
|
||
except json.JSONDecodeError:
|
||
logging.info(f"Skipping invalid JSON: {relevant[:100]}...")
|
||
elif "[CLEANUP]" in relevant or "has timed out" in relevant:
|
||
null_json = '{"receiver-stats":null}'
|
||
cache.update(null_json)
|
||
atomic_write(null_json, LOGFILE_PATH)
|
||
|
||
except socket.timeout:
|
||
continue
|
||
except Exception as e:
|
||
logging.error(f"UDP loop error: {e}", exc_info=True)
|
||
except Exception as e:
|
||
logging.critical(f"UDP thread fatal error: {e}", exc_info=True)
|
||
finally:
|
||
sock.close()
|
||
logging.info("UDP listener stopped.")
|
||
|
||
# ----------------------------------------------------------------------
|
||
# HTTP Server
|
||
# ----------------------------------------------------------------------
|
||
class StatsHTTPHandler(BaseHTTPRequestHandler):
|
||
def _set_headers(self):
|
||
self.send_response(200)
|
||
self.send_header('Content-type', 'application/json')
|
||
self.send_header('Access-Control-Allow-Origin', '*')
|
||
self.end_headers()
|
||
|
||
def log_message(self, format, *args):
|
||
return # suppress default logging
|
||
|
||
def do_HEAD(self):
|
||
self._set_headers()
|
||
|
||
def do_GET(self):
|
||
try:
|
||
self._set_headers()
|
||
json_str = cache.get()
|
||
self.wfile.write(json_str.encode('utf-8'))
|
||
except (BrokenPipeError, ConnectionResetError):
|
||
pass
|
||
except Exception as e:
|
||
logging.error(f"HTTP handler error: {e}")
|
||
|
||
def run_http_server():
|
||
server_address = (HTTP_ADDR, HTTP_PORT)
|
||
httpd = HTTPServer(server_address, StatsHTTPHandler)
|
||
logging.info(f'HTTP server started on http://{HTTP_ADDR}:{HTTP_PORT}')
|
||
try:
|
||
httpd.serve_forever()
|
||
except KeyboardInterrupt:
|
||
pass
|
||
except Exception as e:
|
||
logging.error(f"HTTP server error: {e}")
|
||
finally:
|
||
httpd.server_close()
|
||
logging.info('HTTP server stopped.')
|
||
|
||
# ----------------------------------------------------------------------
|
||
# WebSocket Server
|
||
# ----------------------------------------------------------------------
|
||
class WebSocketManager:
|
||
def __init__(self):
|
||
self.clients = set()
|
||
self._lock = asyncio.Lock()
|
||
|
||
async def register(self, websocket):
|
||
async with self._lock:
|
||
self.clients.add(websocket)
|
||
|
||
async def unregister(self, websocket):
|
||
async with self._lock:
|
||
self.clients.discard(websocket)
|
||
|
||
async def broadcast(self, message: str):
|
||
async with self._lock:
|
||
clients = list(self.clients)
|
||
for ws in clients:
|
||
try:
|
||
await ws.send(message)
|
||
except Exception:
|
||
pass
|
||
|
||
ws_manager = WebSocketManager()
|
||
|
||
# FIXED: Removed the 'path' argument for compatibility with websockets ≥ 11.0
|
||
async def websocket_handler(websocket):
|
||
"""Handle a single WebSocket connection."""
|
||
await ws_manager.register(websocket)
|
||
try:
|
||
# Send current state immediately
|
||
current = cache.get()
|
||
await websocket.send(current)
|
||
# Keep connection alive until client disconnects
|
||
await websocket.wait_closed()
|
||
except ConnectionClosed:
|
||
pass
|
||
finally:
|
||
await ws_manager.unregister(websocket)
|
||
|
||
async def on_stats_updated(new_json: str):
|
||
await ws_manager.broadcast(new_json)
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Main
|
||
# ----------------------------------------------------------------------
|
||
async def main_async():
|
||
loop = asyncio.get_running_loop()
|
||
cache.set_loop(loop)
|
||
cache.set_update_callback(on_stats_updated)
|
||
|
||
udp_thread = threading.Thread(target=udp_listener, daemon=True)
|
||
udp_thread.start()
|
||
|
||
http_thread = threading.Thread(target=run_http_server, daemon=True)
|
||
http_thread.start()
|
||
|
||
async with websockets.serve(
|
||
websocket_handler,
|
||
HTTP_ADDR,
|
||
WS_PORT,
|
||
ping_interval=20,
|
||
ping_timeout=20,
|
||
close_timeout=10,
|
||
):
|
||
logging.info(f'WebSocket server started on ws://{HTTP_ADDR}:{WS_PORT}')
|
||
await asyncio.Future() # run forever
|
||
|
||
if __name__ == "__main__":
|
||
banner_path = Path("banner.txt")
|
||
if banner_path.exists():
|
||
print(banner_path.read_text())
|
||
|
||
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
|
||
|
||
try:
|
||
asyncio.run(main_async())
|
||
except KeyboardInterrupt:
|
||
logging.info("Shutting down...")
|
||
except Exception as e:
|
||
logging.critical(f"Fatal error: {e}", exc_info=True)
|
||
finally:
|
||
sys.exit(0) |