b027d6dcc0
+ more resilience for stats servers
97 lines
4.1 KiB
Python
97 lines
4.1 KiB
Python
import socket
|
||
import os
|
||
import json
|
||
from pathlib import Path
|
||
import tempfile
|
||
import logging
|
||
|
||
# Configure basic logging to stdout (or adjust as needed)
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
UDP_ADDR = os.getenv('env_udp_addr', '0.0.0.0')
|
||
UDP_PORT = int(os.getenv('env_udp_port', '5005'))
|
||
LOGFILE_PATH = os.getenv('env_logfile_path', 'logfile.json')
|
||
# Maximum theoretical UDP datagram size (IPv4). Using 65535 ensures we never truncate.
|
||
UDP_MAX_SIZE = 65535
|
||
|
||
def atomic_write(content, file_path):
|
||
"""Simple atomic file write - write to temp file then rename"""
|
||
try:
|
||
# Ensure directory exists before writing (still good practice, though not the main fix)
|
||
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
||
# Use explicit UTF-8 encoding for cross-platform safety
|
||
with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(file_path),
|
||
delete=False, suffix='.tmp', encoding='utf-8') as tmp:
|
||
tmp.write(content)
|
||
temp_path = tmp.name
|
||
# Atomic replace
|
||
os.replace(temp_path, file_path)
|
||
except Exception as e:
|
||
logging.error(f"Atomic write failed: {e}")
|
||
# Clean up temporary file if it exists
|
||
try:
|
||
if 'temp_path' in locals() and os.path.exists(temp_path):
|
||
os.unlink(temp_path)
|
||
except Exception:
|
||
pass
|
||
|
||
def checklogs():
|
||
logging.info(f'Starting UDP log listener on udp://{UDP_ADDR}:{UDP_PORT}')
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.bind((UDP_ADDR, UDP_PORT))
|
||
|
||
# FIX 1: Set a timeout so the loop doesn't block indefinitely.
|
||
# This allows periodic logging or heartbeat checks.
|
||
sock.settimeout(10.0) # 10 second timeout
|
||
|
||
try:
|
||
while True:
|
||
try:
|
||
# FIX 2: Use maximum UDP buffer size to avoid truncation
|
||
data, addr = sock.recvfrom(UDP_MAX_SIZE)
|
||
|
||
# Detect possible truncation (if we received exactly buffer size,
|
||
# the datagram may have been larger than UDP_MAX_SIZE)
|
||
if len(data) == UDP_MAX_SIZE:
|
||
logging.warning(f"Received datagram of maximum size ({UDP_MAX_SIZE} bytes) from {addr}. "
|
||
"Possible truncation; data may be incomplete.")
|
||
|
||
decoded_data = data.decode("utf-8")
|
||
parts = decoded_data.split('|')
|
||
|
||
if len(parts) < 3:
|
||
continue
|
||
|
||
relevant_data = parts[2].strip().replace("[INFO] ", "")
|
||
|
||
if "receiver-stats" in relevant_data:
|
||
try:
|
||
json_data = json.loads(relevant_data)
|
||
atomic_write(json.dumps(json_data), LOGFILE_PATH)
|
||
except json.JSONDecodeError:
|
||
logging.info(f"Skipping invalid JSON: {relevant_data}")
|
||
elif "[CLEANUP]" in relevant_data:
|
||
logging.info("Cleanup log detected.")
|
||
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
|
||
elif "has timed out" in relevant_data:
|
||
logging.info("Disconnect log detected.")
|
||
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
|
||
|
||
except socket.timeout:
|
||
# Timeout occurred – do nothing or optionally log a heartbeat
|
||
# logging.debug("No data received within timeout, still listening...")
|
||
pass
|
||
except Exception as e:
|
||
logging.error(f"Unexpected error in main loop: {e}", exc_info=True)
|
||
# Continue listening; do not crash
|
||
|
||
except KeyboardInterrupt:
|
||
logging.info("Keyboard interrupt received, shutting down.")
|
||
finally:
|
||
sock.close()
|
||
logging.info("UDP listener stopped.")
|
||
|
||
if __name__ == "__main__":
|
||
# Initialize output file
|
||
atomic_write('{"receiver-stats":null}', LOGFILE_PATH)
|
||
checklogs() |