Coverage for app/backend/src/tests/test_app.py: 100%

78 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import signal 

2from multiprocessing import Process 

3from typing import cast 

4 

5import grpc 

6import pytest 

7 

8from couchers import supervisor 

9from couchers.constants import GRACEFUL_SHUTDOWN_TIMEOUT 

10from couchers.server import create_main_server, create_media_server 

11 

12 

13@pytest.fixture(autouse=True) 

14def _(testconfig): 

15 pass 

16 

17 

18def test_create_servers(): 

19 server = create_main_server(port=1751) 

20 media_server = create_media_server(port=1753) 

21 server.start() 

22 media_server.start() 

23 server.stop(None).wait() 

24 media_server.stop(None).wait() 

25 

26 

27class FakeProcess: 

28 """Stands in for multiprocessing.Process so the supervisor can be tested without spawning.""" 

29 

30 def __init__(self, name="fake", alive=True): 

31 self.name = name 

32 self.pid = 123 

33 self.exitcode = 0 

34 self._alive = alive 

35 self.terminated = False 

36 self.joined = False 

37 

38 def is_alive(self): 

39 return self._alive 

40 

41 def terminate(self): 

42 self.terminated = True 

43 self._alive = False 

44 

45 def join(self, timeout=None): 

46 self.joined = True 

47 

48 

49class FakeServer: 

50 """Stands in for a grpc.Server so parent-server draining can be tested without a real server.""" 

51 

52 def __init__(self): 

53 self.stop_grace = None 

54 self.waited = False 

55 

56 def stop(self, grace): 

57 self.stop_grace = grace 

58 return self 

59 

60 def wait(self, timeout=None): 

61 self.waited = True 

62 

63 

64def _as_children(*procs: FakeProcess) -> list[Process]: 

65 return [cast(Process, p) for p in procs] 

66 

67 

68def test_supervise_returns_crashed_child_and_drains_the_rest(monkeypatch): 

69 monkeypatch.setattr(signal, "signal", lambda *a: None) 

70 

71 alive = FakeProcess("api-1761", alive=True) 

72 dead = FakeProcess("api-1762", alive=False) 

73 children = _as_children(alive, dead) 

74 

75 crashed = supervisor.supervise(children) 

76 

77 assert crashed is children[1] 

78 assert alive.terminated 

79 assert alive.joined 

80 

81 

82def test_supervise_returns_none_on_graceful_shutdown(monkeypatch): 

83 # fire the handler as soon as it's registered, so the loop exits without any child dying 

84 def fire_on_register(sig, handler): 

85 if sig == signal.SIGTERM: 

86 handler(sig, None) 

87 

88 monkeypatch.setattr(signal, "signal", fire_on_register) 

89 

90 a = FakeProcess("api-1761") 

91 b = FakeProcess("api-1762") 

92 

93 crashed = supervisor.supervise(_as_children(a, b)) 

94 

95 assert crashed is None 

96 assert a.terminated and b.terminated 

97 assert a.joined and b.joined 

98 

99 

100def test_supervise_only_terminates_live_children(monkeypatch): 

101 monkeypatch.setattr(signal, "signal", lambda *a: None) 

102 

103 already_dead = FakeProcess("api-1761", alive=False) 

104 live = FakeProcess("api-1762", alive=True) 

105 

106 supervisor.supervise(_as_children(already_dead, live)) 

107 

108 assert not already_dead.terminated 

109 assert live.terminated 

110 

111 

112def test_supervise_drains_parent_servers_within_the_shutdown_window(monkeypatch): 

113 monkeypatch.setattr(signal, "signal", lambda *a: None) 

114 

115 dead = FakeProcess("api-1761", alive=False) 

116 media = FakeServer() 

117 

118 supervisor.supervise(_as_children(dead), parent_servers=[cast(grpc.Server, media)]) 

119 

120 assert media.stop_grace == GRACEFUL_SHUTDOWN_TIMEOUT 

121 assert media.waited