Coverage for app/backend/src/tests/test_app.py: 100%
78 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import signal
2from multiprocessing import Process
3from typing import cast
5import grpc
6import pytest
8from couchers import supervisor
9from couchers.constants import GRACEFUL_SHUTDOWN_TIMEOUT
10from couchers.server import create_main_server, create_media_server
13@pytest.fixture(autouse=True)
14def _(testconfig):
15 pass
18def test_create_servers():
19 server = create_main_server(port=1751)
20 media_server = create_media_server(port=1753)
21 server.start()
22 media_server.start()
23 server.stop(None).wait()
24 media_server.stop(None).wait()
27class FakeProcess:
28 """Stands in for multiprocessing.Process so the supervisor can be tested without spawning."""
30 def __init__(self, name="fake", alive=True):
31 self.name = name
32 self.pid = 123
33 self.exitcode = 0
34 self._alive = alive
35 self.terminated = False
36 self.joined = False
38 def is_alive(self):
39 return self._alive
41 def terminate(self):
42 self.terminated = True
43 self._alive = False
45 def join(self, timeout=None):
46 self.joined = True
49class FakeServer:
50 """Stands in for a grpc.Server so parent-server draining can be tested without a real server."""
52 def __init__(self):
53 self.stop_grace = None
54 self.waited = False
56 def stop(self, grace):
57 self.stop_grace = grace
58 return self
60 def wait(self, timeout=None):
61 self.waited = True
64def _as_children(*procs: FakeProcess) -> list[Process]:
65 return [cast(Process, p) for p in procs]
68def test_supervise_returns_crashed_child_and_drains_the_rest(monkeypatch):
69 monkeypatch.setattr(signal, "signal", lambda *a: None)
71 alive = FakeProcess("api-1761", alive=True)
72 dead = FakeProcess("api-1762", alive=False)
73 children = _as_children(alive, dead)
75 crashed = supervisor.supervise(children)
77 assert crashed is children[1]
78 assert alive.terminated
79 assert alive.joined
82def test_supervise_returns_none_on_graceful_shutdown(monkeypatch):
83 # fire the handler as soon as it's registered, so the loop exits without any child dying
84 def fire_on_register(sig, handler):
85 if sig == signal.SIGTERM:
86 handler(sig, None)
88 monkeypatch.setattr(signal, "signal", fire_on_register)
90 a = FakeProcess("api-1761")
91 b = FakeProcess("api-1762")
93 crashed = supervisor.supervise(_as_children(a, b))
95 assert crashed is None
96 assert a.terminated and b.terminated
97 assert a.joined and b.joined
100def test_supervise_only_terminates_live_children(monkeypatch):
101 monkeypatch.setattr(signal, "signal", lambda *a: None)
103 already_dead = FakeProcess("api-1761", alive=False)
104 live = FakeProcess("api-1762", alive=True)
106 supervisor.supervise(_as_children(already_dead, live))
108 assert not already_dead.terminated
109 assert live.terminated
112def test_supervise_drains_parent_servers_within_the_shutdown_window(monkeypatch):
113 monkeypatch.setattr(signal, "signal", lambda *a: None)
115 dead = FakeProcess("api-1761", alive=False)
116 media = FakeServer()
118 supervisor.supervise(_as_children(dead), parent_servers=[cast(grpc.Server, media)])
120 assert media.stop_grace == GRACEFUL_SHUTDOWN_TIMEOUT
121 assert media.waited