Coverage for src / tests / test_db.py: 99%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-05 01:05 +0000

1import difflib 

2import os 

3import re 

4import subprocess 

5from pathlib import Path 

6 

7import pytest 

8from google.protobuf import empty_pb2 

9from sqlalchemy import select 

10from sqlalchemy.sql import func 

11 

12from couchers.config import config 

13from couchers.db import apply_migrations, get_parent_node_at_location, session_scope 

14from couchers.jobs.handlers import DatabaseInconsistencyError, check_database_consistency 

15from couchers.models import User 

16from couchers.utils import ( 

17 is_valid_email, 

18 is_valid_name, 

19 is_valid_user_id, 

20 is_valid_username, 

21 parse_date, 

22) 

23from tests.fixtures.db import create_schema_from_models, drop_database, generate_user, run_migration_test 

24from tests.test_communities import create_1d_point, get_community_id, testing_communities # noqa 

25 

26 

27def test_is_valid_user_id(): 

28 assert is_valid_user_id("10") 

29 assert not is_valid_user_id("1a") 

30 assert not is_valid_user_id("01") 

31 

32 

33def test_is_valid_email(): 

34 assert is_valid_email("a@b.cc") 

35 assert is_valid_email("te.st+email.valid@a.org.au.xx.yy") 

36 assert is_valid_email("invalid@yahoo.co.uk") 

37 assert not is_valid_email("invalid@.yahoo.co.uk") 

38 assert not is_valid_email("test email@couchers.org") 

39 assert not is_valid_email(".testemail@couchers.org") 

40 assert not is_valid_email("testemail@couchersorg") 

41 assert not is_valid_email("b@xxb....blabla") 

42 

43 

44def test_is_valid_username(): 

45 assert is_valid_username("user") 

46 assert is_valid_username("us") 

47 assert is_valid_username("us_er") 

48 assert is_valid_username("us_er1") 

49 assert not is_valid_username("us_") 

50 assert not is_valid_username("u") 

51 assert not is_valid_username("1us") 

52 assert not is_valid_username("User") 

53 

54 

55def test_is_valid_name(): 

56 assert is_valid_name("a") 

57 assert is_valid_name("a b") 

58 assert is_valid_name("1") 

59 assert is_valid_name("老子") 

60 assert not is_valid_name(" ") 

61 assert not is_valid_name("") 

62 assert not is_valid_name(" ") 

63 

64 

65def test_parse_date(): 

66 assert parse_date("2020-01-01") is not None 

67 assert parse_date("1900-01-01") is not None 

68 assert parse_date("2099-01-01") is not None 

69 assert not parse_date("2019-02-29") 

70 assert not parse_date("2019-22-01") 

71 assert not parse_date("2020-1-01") 

72 assert not parse_date("20-01-01") 

73 assert not parse_date("01-01-2020") 

74 assert not parse_date("2020/01/01") 

75 

76 

77def test_get_parent_node_at_location(testing_communities): 

78 with session_scope() as session: 

79 w_id = get_community_id(session, "Global") # 0 to 100 

80 c1_id = get_community_id(session, "Country 1") # 0 to 50 

81 c1r1_id = get_community_id(session, "Country 1, Region 1") # 0 to 10 

82 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") # 0 to 5 

83 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") # 7 to 10 

84 c1r2_id = get_community_id(session, "Country 1, Region 2") # 20 to 25 

85 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") # 21 to 23 

86 c2_id = get_community_id(session, "Country 2") # 52 to 100 

87 c2r1_id = get_community_id(session, "Country 2, Region 1") # 52 to 71 

88 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") # 53 to 70 

89 

90 assert get_parent_node_at_location(session, create_1d_point(1)).id == c1r1c1_id 

91 assert get_parent_node_at_location(session, create_1d_point(3)).id == c1r1c1_id 

92 assert get_parent_node_at_location(session, create_1d_point(6)).id == c1r1_id 

93 assert get_parent_node_at_location(session, create_1d_point(8)).id == c1r1c2_id 

94 assert get_parent_node_at_location(session, create_1d_point(15)).id == c1_id 

95 assert get_parent_node_at_location(session, create_1d_point(51)).id == w_id 

96 

97 

98def pg_dump(): 

99 return subprocess.run( 

100 ["pg_dump", "-s", config["DATABASE_CONNECTION_STRING"]], stdout=subprocess.PIPE, encoding="ascii", check=True 

101 ).stdout 

102 

103 

104def sort_pg_dump_output(output): 

105 """Sorts the tables, functions and indices dumped by pg_dump in 

106 alphabetic order. Also sorts all lists enclosed with parentheses 

107 in alphabetic order. 

108 """ 

109 # Temporary replace newline with another character for easier 

110 # pattern matching. 

111 s = output.replace("\n", "§") 

112 

113 # Parameter lists are enclosed with parentheses and every entry 

114 # ends with a comma last on the line. 

115 s = re.sub(r" \(§(.*?)§\);", lambda m: " (§" + ",§".join(sorted(m.group(1).split(",§"))) + "§);", s) 

116 

117 # The header for all objects (tables, functions, indices, etc.) 

118 # seems to all start with two dashes and a space. We don't care 

119 # which kind of object it is here. 

120 s = "§-- ".join(sorted(s.split("§-- "))) 

121 

122 # Switch our temporary newline replacement to real newline. 

123 return s.replace("§", "\n") 

124 

125 

126def test_sort_pg_dump_output(): 

127 assert sort_pg_dump_output(" (\nb,\nc,\na\n);\n") == " (\na,\nb,\nc\n);\n" 

128 

129 

130def strip_leading_whitespace(lines): 

131 return [s.lstrip() for s in lines] 

132 

133 

134@pytest.mark.skipif(not run_migration_test(), reason="Migration test disabled") 

135def test_migrations(db, testconfig): 

136 """ 

137 This test will only run successfully if you have `pg_dump` installed and everything set up, which only happens if the 

138 test is being run within Gitlab CI where we do all that setup. So we disable it unless explicitly marked to run. 

139 

140 Compares the database schema built up from migrations, with the 

141 schema built by models.py. Both scenarios are started from an 

142 empty database, and dumped with pg_dump. Any unexplainable 

143 differences in the output are reported in unified diff format and 

144 fail the test. 

145 """ 

146 drop_database() 

147 # rebuild it with alembic migrations 

148 apply_migrations() 

149 

150 with_migrations = pg_dump() 

151 

152 drop_database() 

153 # create everything from the current models, not incrementally 

154 # through migrations 

155 create_schema_from_models() 

156 

157 from_scratch = pg_dump() 

158 

159 # Save the raw schemas to files for CI artifacts 

160 schema_output_dir = os.environ.get("TEST_SCHEMA_OUTPUT_DIR") 

161 if schema_output_dir: 161 ↛ 167line 161 didn't jump to line 167 because the condition on line 161 was always true

162 output_path = Path(schema_output_dir) 

163 output_path.mkdir(parents=True, exist_ok=True) 

164 (output_path / "schema_from_migrations.sql").write_text(with_migrations) 

165 (output_path / "schema_from_models.sql").write_text(from_scratch) 

166 

167 def message(s): 

168 s = sort_pg_dump_output(s) 

169 

170 # filter out alembic tables 

171 s = "\n-- ".join(x for x in s.split("\n-- ") if not x.startswith("Name: alembic_")) 

172 

173 # filter out \restrict and \unrestrict lines (Postgres 16+) 

174 s = "\n".join( 

175 line for line in s.splitlines() if not line.startswith("\\restrict") and not line.startswith("\\unrestrict") 

176 ) 

177 

178 return strip_leading_whitespace(s.splitlines()) 

179 

180 diff = "\n".join( 

181 difflib.unified_diff(message(with_migrations), message(from_scratch), fromfile="migrations", tofile="model") 

182 ) 

183 print(diff) 

184 success = diff == "" 

185 assert success 

186 

187 

188def test_slugify(db): 

189 with session_scope() as session: 

190 assert session.execute(func.slugify("this is a test")).scalar_one() == "this-is-a-test" 

191 assert session.execute(func.slugify("this is ä test")).scalar_one() == "this-is-a-test" 

192 # nothing here gets converted to ascci by unaccent, so it should be empty 

193 assert session.execute(func.slugify("Создай группу своего города")).scalar_one() == "slug" 

194 assert session.execute(func.slugify("Detta är ett test!")).scalar_one() == "detta-ar-ett-test" 

195 assert session.execute(func.slugify("@#(*$&!@#")).scalar_one() == "slug" 

196 assert ( 

197 session.execute( 

198 func.slugify("This has a lot ‒ at least relatively speaking ‒ of punctuation! :)") 

199 ).scalar_one() 

200 == "this-has-a-lot-at-least-relatively-speaking-of-punctuation" 

201 ) 

202 assert ( 

203 session.execute(func.slugify("Multiple - #@! - non-ascii chars")).scalar_one() == "multiple-non-ascii-chars" 

204 ) 

205 assert session.execute(func.slugify("123")).scalar_one() == "123" 

206 assert ( 

207 session.execute( 

208 func.slugify( 

209 "A sentence that is over 64 chars long and where the last thing would be replaced by a dash" 

210 ) 

211 ).scalar_one() 

212 == "a-sentence-that-is-over-64-chars-long-and-where-the-last-thing" 

213 ) 

214 

215 

216def test_database_consistency_check(db, testconfig): 

217 """The database consistency check should pass with valid user/gallery setup""" 

218 # Create a few users (which auto-creates their profile galleries) 

219 generate_user() 

220 generate_user() 

221 generate_user() 

222 

223 # This should not raise any exceptions 

224 check_database_consistency(empty_pb2.Empty()) 

225 

226 # Now break consistency by removing a user's profile gallery 

227 with session_scope() as session: 

228 user = session.execute(select(User).where(User.is_deleted == False).limit(1)).scalar_one() 

229 user.profile_gallery_id = None 

230 

231 # This should now raise an exception 

232 with pytest.raises(DatabaseInconsistencyError): 

233 check_database_consistency(empty_pb2.Empty())