Coverage for src/tests/test_db.py: 100%

129 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1import difflib 

2import os 

3import re 

4import subprocess 

5from pathlib import Path 

6 

7import pytest 

8from google.protobuf import empty_pb2 

9from sqlalchemy.sql import func 

10 

11from couchers.config import config 

12from couchers.db import apply_migrations, get_parent_node_at_location, session_scope 

13from couchers.jobs.handlers import DatabaseInconsistencyError, check_database_consistency 

14from couchers.models import User 

15from couchers.sql import couchers_select as select 

16from couchers.utils import ( 

17 is_valid_email, 

18 is_valid_name, 

19 is_valid_user_id, 

20 is_valid_username, 

21 parse_date, 

22) 

23from tests.test_communities import create_1d_point, get_community_id, testing_communities # noqa 

24from tests.test_fixtures import ( # noqa 

25 create_schema_from_models, 

26 db, 

27 drop_database, 

28 generate_user, 

29 run_migration_test, 

30 testconfig, 

31) 

32 

33 

34def test_is_valid_user_id(): 

35 assert is_valid_user_id("10") 

36 assert not is_valid_user_id("1a") 

37 assert not is_valid_user_id("01") 

38 

39 

40def test_is_valid_email(): 

41 assert is_valid_email("a@b.cc") 

42 assert is_valid_email("te.st+email.valid@a.org.au.xx.yy") 

43 assert is_valid_email("invalid@yahoo.co.uk") 

44 assert not is_valid_email("invalid@.yahoo.co.uk") 

45 assert not is_valid_email("test email@couchers.org") 

46 assert not is_valid_email(".testemail@couchers.org") 

47 assert not is_valid_email("testemail@couchersorg") 

48 assert not is_valid_email("b@xxb....blabla") 

49 

50 

51def test_is_valid_username(): 

52 assert is_valid_username("user") 

53 assert is_valid_username("us") 

54 assert is_valid_username("us_er") 

55 assert is_valid_username("us_er1") 

56 assert not is_valid_username("us_") 

57 assert not is_valid_username("u") 

58 assert not is_valid_username("1us") 

59 assert not is_valid_username("User") 

60 

61 

62def test_is_valid_name(): 

63 assert is_valid_name("a") 

64 assert is_valid_name("a b") 

65 assert is_valid_name("1") 

66 assert is_valid_name("老子") 

67 assert not is_valid_name(" ") 

68 assert not is_valid_name("") 

69 assert not is_valid_name(" ") 

70 

71 

72def test_parse_date(): 

73 assert parse_date("2020-01-01") is not None 

74 assert parse_date("1900-01-01") is not None 

75 assert parse_date("2099-01-01") is not None 

76 assert not parse_date("2019-02-29") 

77 assert not parse_date("2019-22-01") 

78 assert not parse_date("2020-1-01") 

79 assert not parse_date("20-01-01") 

80 assert not parse_date("01-01-2020") 

81 assert not parse_date("2020/01/01") 

82 

83 

84def test_get_parent_node_at_location(testing_communities): 

85 with session_scope() as session: 

86 w_id = get_community_id(session, "Global") # 0 to 100 

87 c1_id = get_community_id(session, "Country 1") # 0 to 50 

88 c1r1_id = get_community_id(session, "Country 1, Region 1") # 0 to 10 

89 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") # 0 to 5 

90 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") # 7 to 10 

91 c1r2_id = get_community_id(session, "Country 1, Region 2") # 20 to 25 

92 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") # 21 to 23 

93 c2_id = get_community_id(session, "Country 2") # 52 to 100 

94 c2r1_id = get_community_id(session, "Country 2, Region 1") # 52 to 71 

95 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") # 53 to 70 

96 

97 assert get_parent_node_at_location(session, create_1d_point(1)).id == c1r1c1_id 

98 assert get_parent_node_at_location(session, create_1d_point(3)).id == c1r1c1_id 

99 assert get_parent_node_at_location(session, create_1d_point(6)).id == c1r1_id 

100 assert get_parent_node_at_location(session, create_1d_point(8)).id == c1r1c2_id 

101 assert get_parent_node_at_location(session, create_1d_point(15)).id == c1_id 

102 assert get_parent_node_at_location(session, create_1d_point(51)).id == w_id 

103 

104 

105def pg_dump(): 

106 return subprocess.run( 

107 ["pg_dump", "-s", config["DATABASE_CONNECTION_STRING"]], stdout=subprocess.PIPE, encoding="ascii", check=True 

108 ).stdout 

109 

110 

111def sort_pg_dump_output(output): 

112 """Sorts the tables, functions and indices dumped by pg_dump in 

113 alphabetic order. Also sorts all lists enclosed with parentheses 

114 in alphabetic order. 

115 """ 

116 # Temporary replace newline with another character for easier 

117 # pattern matching. 

118 s = output.replace("\n", "§") 

119 

120 # Parameter lists are enclosed with parentheses and every entry 

121 # ends with a comma last on the line. 

122 s = re.sub(r" \(§(.*?)§\);", lambda m: " (§" + ",§".join(sorted(m.group(1).split(",§"))) + "§);", s) 

123 

124 # The header for all objects (tables, functions, indices, etc.) 

125 # seems to all start with two dashes and a space. We don't care 

126 # which kind of object it is here. 

127 s = "§-- ".join(sorted(s.split("§-- "))) 

128 

129 # Switch our temporary newline replacement to real newline. 

130 return s.replace("§", "\n") 

131 

132 

133def test_sort_pg_dump_output(): 

134 assert sort_pg_dump_output(" (\nb,\nc,\na\n);\n") == " (\na,\nb,\nc\n);\n" 

135 

136 

137def strip_leading_whitespace(lines): 

138 return [s.lstrip() for s in lines] 

139 

140 

141@pytest.mark.skipif(not run_migration_test(), reason="Migration test disabled") 

142def test_migrations(db, testconfig): 

143 """ 

144 This test will only run successfully if you have `pg_dump` installed and everything set up, which only happens if the 

145 test is being run within Gitlab CI where we do all that setup. So we disable it unless explicitly marked to run. 

146 

147 Compares the database schema built up from migrations, with the 

148 schema built by models.py. Both scenarios are started from an 

149 empty database, and dumped with pg_dump. Any unexplainable 

150 differences in the output are reported in unified diff format and 

151 fail the test. 

152 """ 

153 drop_database() 

154 # rebuild it with alembic migrations 

155 apply_migrations() 

156 

157 with_migrations = pg_dump() 

158 

159 drop_database() 

160 # create everything from the current models, not incrementally 

161 # through migrations 

162 create_schema_from_models() 

163 

164 from_scratch = pg_dump() 

165 

166 # Save the raw schemas to files for CI artifacts 

167 schema_output_dir = os.environ.get("TEST_SCHEMA_OUTPUT_DIR") 

168 if schema_output_dir: 

169 output_path = Path(schema_output_dir) 

170 output_path.mkdir(parents=True, exist_ok=True) 

171 (output_path / "schema_from_migrations.sql").write_text(with_migrations) 

172 (output_path / "schema_from_models.sql").write_text(from_scratch) 

173 

174 def message(s): 

175 s = sort_pg_dump_output(s) 

176 

177 # filter out alembic tables 

178 s = "\n-- ".join(x for x in s.split("\n-- ") if not x.startswith("Name: alembic_")) 

179 

180 # filter out \restrict and \unrestrict lines (Postgres 16+) 

181 s = "\n".join( 

182 line for line in s.splitlines() if not line.startswith("\\restrict") and not line.startswith("\\unrestrict") 

183 ) 

184 

185 return strip_leading_whitespace(s.splitlines()) 

186 

187 diff = "\n".join( 

188 difflib.unified_diff(message(with_migrations), message(from_scratch), fromfile="migrations", tofile="model") 

189 ) 

190 print(diff) 

191 success = diff == "" 

192 assert success 

193 

194 

195def test_slugify(db): 

196 with session_scope() as session: 

197 assert session.execute(func.slugify("this is a test")).scalar_one() == "this-is-a-test" 

198 assert session.execute(func.slugify("this is ä test")).scalar_one() == "this-is-a-test" 

199 # nothing here gets converted to ascci by unaccent, so it should be empty 

200 assert session.execute(func.slugify("Создай группу своего города")).scalar_one() == "slug" 

201 assert session.execute(func.slugify("Detta är ett test!")).scalar_one() == "detta-ar-ett-test" 

202 assert session.execute(func.slugify("@#(*$&!@#")).scalar_one() == "slug" 

203 assert ( 

204 session.execute( 

205 func.slugify("This has a lot ‒ at least relatively speaking ‒ of punctuation! :)") 

206 ).scalar_one() 

207 == "this-has-a-lot-at-least-relatively-speaking-of-punctuation" 

208 ) 

209 assert ( 

210 session.execute(func.slugify("Multiple - #@! - non-ascii chars")).scalar_one() == "multiple-non-ascii-chars" 

211 ) 

212 assert session.execute(func.slugify("123")).scalar_one() == "123" 

213 assert ( 

214 session.execute( 

215 func.slugify( 

216 "A sentence that is over 64 chars long and where the last thing would be replaced by a dash" 

217 ) 

218 ).scalar_one() 

219 == "a-sentence-that-is-over-64-chars-long-and-where-the-last-thing" 

220 ) 

221 

222 

223def test_database_consistency_check(db, testconfig): 

224 """The database consistency check should pass with valid user/gallery setup""" 

225 # Create a few users (which auto-creates their profile galleries) 

226 generate_user() 

227 generate_user() 

228 generate_user() 

229 

230 # This should not raise any exceptions 

231 check_database_consistency(empty_pb2.Empty()) 

232 

233 # Now break consistency by removing a user's profile gallery 

234 with session_scope() as session: 

235 user = session.execute(select(User).where(User.is_deleted == False).limit(1)).scalar_one() 

236 user.profile_gallery_id = None 

237 

238 # This should now raise an exception 

239 with pytest.raises(DatabaseInconsistencyError): 

240 check_database_consistency(empty_pb2.Empty())