+ added raspi 2 (32bit arm)

+ more resilience for stats servers
This commit is contained in:
moo
2026-04-26 16:25:56 +02:00
parent 2d6257f4a1
commit b027d6dcc0
5 changed files with 366 additions and 205 deletions
+3 -3
View File
@@ -1,5 +1,5 @@
REPOSITORY=registry.dissertori.lan
#REPOSITORY=registry.dissertori.lan
#REPOSITORY=10.0.1.52:5000
#REPOSITORY=moothecow
REPOSITORY=moothecow
#REPOSITORY=172.105.85.44
VERSION=0.0.16
VERSION=0.0.19
+203 -134
View File
@@ -1,181 +1,250 @@
from http.server import BaseHTTPRequestHandler, HTTPServer
import os
from pathlib import Path
import asyncio
import websockets
import threading
import sys
from websockets.exceptions import ConnectionClosed, InvalidHandshake, InvalidMessage
import socket
import os
import json
from pathlib import Path
import tempfile
import threading
import asyncio
import sys
import logging
from http.server import BaseHTTPRequestHandler, HTTPServer
import websockets
from websockets.exceptions import ConnectionClosed
#UDP BEGIN
# ----------------------------------------------------------------------
# Configuration
# ----------------------------------------------------------------------
UDP_ADDR = os.getenv('env_udp_addr', '0.0.0.0')
UDP_PORT = int(os.getenv('env_udp_port', '5005'))
LOGFILE_PATH = os.getenv('env_logfile_path', 'logfile.json')
def checklogs():
print(f'Starting UDP log check udp://{UDP_ADDR}:{UDP_PORT}...\n')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_ADDR, UDP_PORT))
try:
while True:
data, addr = sock.recvfrom(32 * 1024) # buffer size
decoded_data = data.decode("utf-8")
parts = decoded_data.split('|')
# Check if the log entry has the expected structure
if len(parts) < 3:
continue
# Extract the relevant part
relevant_data = parts[2].strip().replace("[INFO] ", "")
if "receiver-stats" in relevant_data:
try:
# Attempt to parse as JSON
json_data = json.loads(relevant_data)
with open(LOGFILE_PATH, 'w') as f:
json.dump(json_data, f)
f.flush() # Force write to disk
os.fsync(f.fileno()) # Ensure it's written to filesystem
except json.JSONDecodeError:
print(f"Skipping invalid JSON: {relevant_data}")
elif "[CLEANUP]" in relevant_data:
print("Cleanup log detected.")
with open(LOGFILE_PATH, 'w') as f:
f.write('{"receiver-stats":null}')
f.flush()
os.fsync(f.fileno())
elif "has timed out" in relevant_data:
print("Disconnect log detected.")
with open(LOGFILE_PATH, 'w') as f:
f.write('{"receiver-stats":null}')
f.flush()
os.fsync(f.fileno())
except KeyboardInterrupt:
pass
finally:
sock.close()
print(f"...Stopped UDP log check\n")
#UDP END
HTTP_ADDR = os.getenv('env_http_addr', '0.0.0.0')
HTTP_PORT = int(os.getenv('env_http_port', '8080'))
WS_PORT = int(os.getenv('env_ws_port', '8081'))
LOGFILE_PATH = os.getenv('env_logfile_path', 'logfile.json')
json_string = '{"receiver-stats":null}'
async def handler(websocket):
"""WebSocket handler with improved error handling"""
global json_string
while True:
# Maximum UDP datagram size (IPv4)
UDP_MAX_SIZE = 65535
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# ----------------------------------------------------------------------
# Threadsafe cache for latest stats
# ----------------------------------------------------------------------
class StatsCache:
def __init__(self, initial_data='{"receiver-stats":null}'):
self._lock = threading.Lock()
self._data = initial_data
self._loop = None
self._update_callback = None
def set_loop(self, loop):
self._loop = loop
def set_update_callback(self, callback):
self._update_callback = callback
def update(self, new_data: str):
with self._lock:
self._data = new_data
if self._loop and self._update_callback:
asyncio.run_coroutine_threadsafe(self._update_callback(new_data), self._loop)
def get(self) -> str:
with self._lock:
return self._data
# Global cache instance
cache = StatsCache()
# ----------------------------------------------------------------------
# Atomic file write utility
# ----------------------------------------------------------------------
def atomic_write(content: str, file_path: str):
"""Write content to file_path atomically using a temp file + rename."""
try:
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(file_path),
delete=False, suffix='.tmp', encoding='utf-8') as tmp:
tmp.write(content)
temp_path = tmp.name
os.replace(temp_path, file_path)
except Exception as e:
logging.error(f"Atomic write failed: {e}")
try:
await asyncio.sleep(0.5)
if Path.exists(Path(LOGFILE_PATH)):
with open(LOGFILE_PATH, 'r') as f:
json_string = f.read()
if json_string != '{"receiver-stats":null}':
reply = f"{json_string}"
await websocket.send(reply)
except ConnectionClosed:
break # Client disconnected normally
except Exception as e:
print(f"WebSocket error: {str(e)}")
break # Disconnect on other errors
if 'temp_path' in locals() and os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass
class StatsServer(BaseHTTPRequestHandler):
# ----------------------------------------------------------------------
# UDP Listener Thread
# ----------------------------------------------------------------------
def udp_listener():
logging.info(f'Starting UDP listener on {UDP_ADDR}:{UDP_PORT}')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_ADDR, UDP_PORT))
sock.settimeout(1.0)
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
try:
while True:
try:
data, addr = sock.recvfrom(UDP_MAX_SIZE)
if len(data) == UDP_MAX_SIZE:
logging.warning(f"Max size datagram received from {addr}; possible truncation.")
decoded = data.decode('utf-8')
parts = decoded.split('|')
if len(parts) < 3:
continue
relevant = parts[2].strip().replace("[INFO] ", "")
if "receiver-stats" in relevant:
try:
json_data = json.loads(relevant)
new_json_str = json.dumps(json_data)
cache.update(new_json_str)
atomic_write(new_json_str, LOGFILE_PATH)
except json.JSONDecodeError:
logging.info(f"Skipping invalid JSON: {relevant[:100]}...")
elif "[CLEANUP]" in relevant or "has timed out" in relevant:
null_json = '{"receiver-stats":null}'
cache.update(null_json)
atomic_write(null_json, LOGFILE_PATH)
except socket.timeout:
continue
except Exception as e:
logging.error(f"UDP loop error: {e}", exc_info=True)
except Exception as e:
logging.critical(f"UDP thread fatal error: {e}", exc_info=True)
finally:
sock.close()
logging.info("UDP listener stopped.")
# ----------------------------------------------------------------------
# HTTP Server
# ----------------------------------------------------------------------
class StatsHTTPHandler(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
def log_message(self, format, *args):
# Override to prevent logging
return
return # suppress default logging
def do_HEAD(self):
self._set_headers()
def do_GET(self):
try:
self._set_headers()
global json_string
if Path.exists(Path(LOGFILE_PATH)):
with open(LOGFILE_PATH, 'r') as f:
json_string = f.read()
byte_json_string_utf8 = json_string.encode('utf-8')
self.wfile.write(byte_json_string_utf8)
except BrokenPipeError:
# Client disconnected before response could be sent
pass
except ConnectionResetError:
# Client reset the connection
json_str = cache.get()
self.wfile.write(json_str.encode('utf-8'))
except (BrokenPipeError, ConnectionResetError):
pass
except Exception as e:
print(f"HTTP request error: {str(e)}")
self.send_error(500, "Internal Server Error")
logging.error(f"HTTP handler error: {e}")
def run_http_server():
server_address = (HTTP_ADDR, HTTP_PORT)
httpd = HTTPServer(server_address, StatsServer)
print(f'Starting HTTP server at http://{HTTP_ADDR}:{HTTP_PORT}...')
httpd = HTTPServer(server_address, StatsHTTPHandler)
logging.info(f'HTTP server started on http://{HTTP_ADDR}:{HTTP_PORT}')
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
except Exception as e:
print(f"HTTP server error: {str(e)}")
logging.error(f"HTTP server error: {e}")
finally:
httpd.server_close()
print('HTTP server stopped.')
logging.info('HTTP server stopped.')
async def start_websocket_server():
"""WebSocket server with comprehensive error handling"""
# ----------------------------------------------------------------------
# WebSocket Server
# ----------------------------------------------------------------------
class WebSocketManager:
def __init__(self):
self.clients = set()
self._lock = asyncio.Lock()
async def register(self, websocket):
async with self._lock:
self.clients.add(websocket)
async def unregister(self, websocket):
async with self._lock:
self.clients.discard(websocket)
async def broadcast(self, message: str):
async with self._lock:
clients = list(self.clients)
for ws in clients:
try:
await ws.send(message)
except Exception:
pass
ws_manager = WebSocketManager()
# FIXED: Removed the 'path' argument for compatibility with websockets ≥ 11.0
async def websocket_handler(websocket):
"""Handle a single WebSocket connection."""
await ws_manager.register(websocket)
try:
async with websockets.serve(
handler,
HTTP_ADDR,
WS_PORT,
# Additional settings for better compatibility
ping_interval=20,
ping_timeout=20,
close_timeout=10,
) as server:
print(f'WebSocket server started on ws://{HTTP_ADDR}:{WS_PORT}')
await asyncio.Future() # Run forever
except InvalidHandshake as e:
print(f"WebSocket handshake failed: {str(e)}")
except Exception as e:
print(f"WebSocket server error: {str(e)}")
# Send current state immediately
current = cache.get()
await websocket.send(current)
# Keep connection alive until client disconnects
await websocket.wait_closed()
except ConnectionClosed:
pass
finally:
await ws_manager.unregister(websocket)
if __name__ == "__main__":
with open(LOGFILE_PATH, 'w') as f:
f.write('{"receiver-stats":null}')
f.flush()
os.fsync(f.fileno())
print(Path("banner.txt").read_text())
if Path.exists(Path(LOGFILE_PATH)):
with open(LOGFILE_PATH, 'r') as f:
json_string = f.read()
# Start UDP logger
udp_thread = threading.Thread(target=checklogs)
udp_thread.daemon = True
async def on_stats_updated(new_json: str):
await ws_manager.broadcast(new_json)
# ----------------------------------------------------------------------
# Main
# ----------------------------------------------------------------------
async def main_async():
loop = asyncio.get_running_loop()
cache.set_loop(loop)
cache.set_update_callback(on_stats_updated)
udp_thread = threading.Thread(target=udp_listener, daemon=True)
udp_thread.start()
# Start HTTP server in a separate thread
http_thread = threading.Thread(target=run_http_server)
http_thread.daemon = True
http_thread = threading.Thread(target=run_http_server, daemon=True)
http_thread.start()
async with websockets.serve(
websocket_handler,
HTTP_ADDR,
WS_PORT,
ping_interval=20,
ping_timeout=20,
close_timeout=10,
):
logging.info(f'WebSocket server started on ws://{HTTP_ADDR}:{WS_PORT}')
await asyncio.Future() # run forever
if __name__ == "__main__":
banner_path = Path("banner.txt")
if banner_path.exists():
print(banner_path.read_text())
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
try:
asyncio.run(start_websocket_server())
asyncio.run(main_async())
except KeyboardInterrupt:
print('\nShutting down servers...')
logging.info("Shutting down...")
except Exception as e:
print(f"Main error: {str(e)}")
logging.critical(f"Fatal error: {e}", exc_info=True)
finally:
sys.exit(0)
+78 -34
View File
@@ -2,52 +2,96 @@ import socket
import os
import json
from pathlib import Path
import tempfile
import logging
# Configure basic logging to stdout (or adjust as needed)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
UDP_ADDR = os.getenv('env_udp_addr', '0.0.0.0')
UDP_PORT = int(os.getenv('env_udp_port', '5005'))
LOGFILE_PATH = os.getenv('env_logfile_path', 'logfile.json')
# Maximum theoretical UDP datagram size (IPv4). Using 65535 ensures we never truncate.
UDP_MAX_SIZE = 65535
def atomic_write(content, file_path):
"""Simple atomic file write - write to temp file then rename"""
try:
# Ensure directory exists before writing (still good practice, though not the main fix)
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
# Use explicit UTF-8 encoding for cross-platform safety
with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(file_path),
delete=False, suffix='.tmp', encoding='utf-8') as tmp:
tmp.write(content)
temp_path = tmp.name
# Atomic replace
os.replace(temp_path, file_path)
except Exception as e:
logging.error(f"Atomic write failed: {e}")
# Clean up temporary file if it exists
try:
if 'temp_path' in locals() and os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass
def checklogs():
print(f'Starting UDP log check udp://{UDP_ADDR}:{UDP_PORT}...\n')
logging.info(f'Starting UDP log listener on udp://{UDP_ADDR}:{UDP_PORT}')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_ADDR, UDP_PORT))
# FIX 1: Set a timeout so the loop doesn't block indefinitely.
# This allows periodic logging or heartbeat checks.
sock.settimeout(10.0) # 10 second timeout
try:
while True:
data, addr = sock.recvfrom(32 * 1024) # buffer size
decoded_data = data.decode("utf-8")
parts = decoded_data.split('|')
# Check if the log entry has the expected structure
if len(parts) < 3:
continue
# Extract the relevant part
relevant_data = parts[2].strip().replace("[INFO] ", "")
if "receiver-stats" in relevant_data:
try:
# Attempt to parse as JSON
json_data = json.loads(relevant_data)
with open(LOGFILE_PATH, 'w') as f:
json.dump(json_data, f)
except json.JSONDecodeError:
print(f"Skipping invalid JSON: {relevant_data}")
elif "[CLEANUP]" in relevant_data:
print("Cleanup log detected.")
with open(LOGFILE_PATH, 'w') as f:
f.write('{"receiver-stats":null}')
elif "has timed out" in relevant_data:
print("Disconnect log detected.")
with open(LOGFILE_PATH, 'w') as f:
f.write('{"receiver-stats":null}')
try:
# FIX 2: Use maximum UDP buffer size to avoid truncation
data, addr = sock.recvfrom(UDP_MAX_SIZE)
# Detect possible truncation (if we received exactly buffer size,
# the datagram may have been larger than UDP_MAX_SIZE)
if len(data) == UDP_MAX_SIZE:
logging.warning(f"Received datagram of maximum size ({UDP_MAX_SIZE} bytes) from {addr}. "
"Possible truncation; data may be incomplete.")
decoded_data = data.decode("utf-8")
parts = decoded_data.split('|')
if len(parts) < 3:
continue
relevant_data = parts[2].strip().replace("[INFO] ", "")
if "receiver-stats" in relevant_data:
try:
json_data = json.loads(relevant_data)
atomic_write(json.dumps(json_data), LOGFILE_PATH)
except json.JSONDecodeError:
logging.info(f"Skipping invalid JSON: {relevant_data}")
elif "[CLEANUP]" in relevant_data:
logging.info("Cleanup log detected.")
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
elif "has timed out" in relevant_data:
logging.info("Disconnect log detected.")
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
except socket.timeout:
# Timeout occurred do nothing or optionally log a heartbeat
# logging.debug("No data received within timeout, still listening...")
pass
except Exception as e:
logging.error(f"Unexpected error in main loop: {e}", exc_info=True)
# Continue listening; do not crash
except KeyboardInterrupt:
pass
logging.info("Keyboard interrupt received, shutting down.")
finally:
sock.close()
print(f"...Stopped UDP log check\n")
logging.info("UDP listener stopped.")
if __name__ == "__main__":
print(Path("banner.txt").read_text())
with open(LOGFILE_PATH, 'w') as f:
f.write('{"receiver-stats":null}')
checklogs()
# Initialize output file
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
checklogs()
+78 -34
View File
@@ -2,52 +2,96 @@ import socket
import os
import json
from pathlib import Path
import tempfile
import logging
# Configure basic logging to stdout (or adjust as needed)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
UDP_ADDR = os.getenv('env_udp_addr', '0.0.0.0')
UDP_PORT = int(os.getenv('env_udp_port', '5005'))
LOGFILE_PATH = os.getenv('env_logfile_path', 'logfile.json')
# Maximum theoretical UDP datagram size (IPv4). Using 65535 ensures we never truncate.
UDP_MAX_SIZE = 65535
def atomic_write(content, file_path):
"""Simple atomic file write - write to temp file then rename"""
try:
# Ensure directory exists before writing (still good practice, though not the main fix)
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
# Use explicit UTF-8 encoding for cross-platform safety
with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(file_path),
delete=False, suffix='.tmp', encoding='utf-8') as tmp:
tmp.write(content)
temp_path = tmp.name
# Atomic replace
os.replace(temp_path, file_path)
except Exception as e:
logging.error(f"Atomic write failed: {e}")
# Clean up temporary file if it exists
try:
if 'temp_path' in locals() and os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass
def checklogs():
print(f'Starting UDP log check udp://{UDP_ADDR}:{UDP_PORT}...\n')
logging.info(f'Starting UDP log listener on udp://{UDP_ADDR}:{UDP_PORT}')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_ADDR, UDP_PORT))
# FIX 1: Set a timeout so the loop doesn't block indefinitely.
# This allows periodic logging or heartbeat checks.
sock.settimeout(10.0) # 10 second timeout
try:
while True:
data, addr = sock.recvfrom(32 * 1024) # buffer size
decoded_data = data.decode("utf-8")
parts = decoded_data.split('|')
# Check if the log entry has the expected structure
if len(parts) < 3:
continue
# Extract the relevant part
relevant_data = parts[2].strip().replace("[INFO] ", "")
if "sender-stats" in relevant_data:
try:
# Attempt to parse as JSON
json_data = json.loads(relevant_data)
with open(LOGFILE_PATH, 'w') as f:
json.dump(json_data, f)
except json.JSONDecodeError:
print(f"Skipping invalid JSON: {relevant_data}")
elif "[CLEANUP]" in relevant_data:
print("Cleanup log detected.")
with open(LOGFILE_PATH, 'w') as f:
f.write('{"sender-stats":null}')
elif "has timed out" in relevant_data:
print("Disconnect log detected.")
with open(LOGFILE_PATH, 'w') as f:
f.write('{"sender-stats":null}')
try:
# FIX 2: Use maximum UDP buffer size to avoid truncation
data, addr = sock.recvfrom(UDP_MAX_SIZE)
# Detect possible truncation (if we received exactly buffer size,
# the datagram may have been larger than UDP_MAX_SIZE)
if len(data) == UDP_MAX_SIZE:
logging.warning(f"Received datagram of maximum size ({UDP_MAX_SIZE} bytes) from {addr}. "
"Possible truncation; data may be incomplete.")
decoded_data = data.decode("utf-8")
parts = decoded_data.split('|')
if len(parts) < 3:
continue
relevant_data = parts[2].strip().replace("[INFO] ", "")
if "sender-stats" in relevant_data:
try:
json_data = json.loads(relevant_data)
atomic_write(json.dumps(json_data), LOGFILE_PATH)
except json.JSONDecodeError:
logging.info(f"Skipping invalid JSON: {relevant_data}")
elif "[CLEANUP]" in relevant_data:
logging.info("Cleanup log detected.")
atomic_write('{"sender-stats":null}', LOGFILE_PATH)
elif "has timed out" in relevant_data:
logging.info("Disconnect log detected.")
atomic_write('{"sender-stats":null}', LOGFILE_PATH)
except socket.timeout:
# Timeout occurred do nothing or optionally log a heartbeat
# logging.debug("No data received within timeout, still listening...")
pass
except Exception as e:
logging.error(f"Unexpected error in main loop: {e}", exc_info=True)
# Continue listening; do not crash
except KeyboardInterrupt:
pass
logging.info("Keyboard interrupt received, shutting down.")
finally:
sock.close()
print(f"...Stopped UDP log check\n")
logging.info("UDP listener stopped.")
if __name__ == "__main__":
print(Path("banner.txt").read_text())
with open(LOGFILE_PATH, 'w') as f:
f.write('{"sender-stats":null}')
checklogs()
# Initialize output file
atomic_write('{"sender-stats":null}', LOGFILE_PATH)
checklogs()
+4
View File
@@ -6,6 +6,7 @@ services:
platforms:
- linux/amd64
- linux/arm64
- linux/arm/v7
image: ${REPOSITORY}/moo-rist:${VERSION}
moo-rist-forwarder:
build:
@@ -14,6 +15,7 @@ services:
platforms:
- linux/amd64
- linux/arm64
- linux/arm/v7
image: ${REPOSITORY}/moo-rist-forwarder:${VERSION}
moo-rist-to-rist:
build:
@@ -22,6 +24,7 @@ services:
platforms:
- linux/amd64
- linux/arm64
- linux/arm/v7
image: ${REPOSITORY}/moo-rist-to-rist:${VERSION}
#moo-rist-relay:
# build:
@@ -54,6 +57,7 @@ services:
platforms:
- linux/amd64
- linux/arm64
- linux/arm/v7
image: ${REPOSITORY}/moo-rist-stats:${VERSION}
#moostream-stats-sender:
# build: