Coverage for app/backend/src/couchers/supervisor.py: 96%

36 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-31 14:08 +0000

1""" 

2Parent-process supervision for the forked child processes (API workers, background workers, scheduler). 

3 

4We don't respawn individual children: if any child exits on its own the parent tears the rest down and 

5returns the dead child, so the caller can exit non-zero and let the container be restarted from scratch. 

6On SIGTERM/SIGINT the parent instead shuts down gracefully, forwarding SIGTERM so children drain. 

7 

8All teardown happens within a single GRACEFUL_SHUTDOWN_TIMEOUT window: the children and any parent-local 

9gRPC servers (e.g. the media server) are all told to stop first, then waited on concurrently, so their 

10drain budgets overlap instead of stacking — total shutdown stays under the container's stop_grace_period. 

11""" 

12 

13import logging 

14import signal 

15import threading 

16import time 

17from collections.abc import Sequence 

18from multiprocessing import Process 

19 

20import grpc 

21 

22from couchers.constants import GRACEFUL_SHUTDOWN_TIMEOUT 

23from couchers.metrics import supervised_children_alive_gauge 

24 

25logger = logging.getLogger(__name__) 

26 

27_POLL_INTERVAL = 1.0 

28 

29 

30def supervise(children: list[Process], parent_servers: Sequence[grpc.Server] = ()) -> Process | None: 

31 """Block until a shutdown signal or until a child dies, then drain everything. Returns the dead child.""" 

32 shutting_down = threading.Event() 

33 

34 def handle_signal(signum: int, frame: object) -> None: 

35 logger.info(f"Received signal {signum}, shutting down") 

36 shutting_down.set() 

37 

38 signal.signal(signal.SIGTERM, handle_signal) 

39 signal.signal(signal.SIGINT, handle_signal) 

40 

41 crashed: Process | None = None 

42 while not shutting_down.is_set(): 

43 supervised_children_alive_gauge.set(sum(child.is_alive() for child in children)) 

44 crashed = next((child for child in children if not child.is_alive()), None) 

45 if crashed is not None: 45 ↛ 48line 45 didn't jump to line 48 because the condition on line 45 was always true

46 logger.critical(f"Child {crashed.name} (pid {crashed.pid}) exited with code {crashed.exitcode}") 

47 break 

48 shutting_down.wait(_POLL_INTERVAL) 

49 

50 # initiate every drain (all non-blocking) before waiting on any, so child and parent-server draining 

51 # overlap under one shared deadline rather than running back-to-back 

52 for child in children: 

53 if child.is_alive(): 

54 child.terminate() 

55 server_stopped = [server.stop(GRACEFUL_SHUTDOWN_TIMEOUT) for server in parent_servers] 

56 deadline = time.monotonic() + GRACEFUL_SHUTDOWN_TIMEOUT 

57 for child in children: 

58 child.join(timeout=max(0.0, deadline - time.monotonic())) 

59 for stopped in server_stopped: 

60 stopped.wait(timeout=max(0.0, deadline - time.monotonic())) 

61 

62 return crashed