Coverage for src/tests/test_db.py: 100%

112 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 20:25 +0000

1import difflib 

2import re 

3import subprocess 

4 

5import pytest 

6from sqlalchemy.sql import func 

7 

8from couchers.config import config 

9from couchers.db import apply_migrations, get_parent_node_at_location, session_scope 

10from couchers.utils import ( 

11 is_valid_email, 

12 is_valid_name, 

13 is_valid_user_id, 

14 is_valid_username, 

15 parse_date, 

16) 

17from tests.test_communities import create_1d_point, get_community_id, testing_communities # noqa 

18from tests.test_fixtures import ( # noqa 

19 create_schema_from_models, 

20 db, 

21 drop_database, 

22 recreate_database, 

23 run_migration_test, 

24 testconfig, 

25 truncate_all_tables, 

26) 

27 

28 

29def test_is_valid_user_id(): 

30 assert is_valid_user_id("10") 

31 assert not is_valid_user_id("1a") 

32 assert not is_valid_user_id("01") 

33 

34 

35def test_is_valid_email(): 

36 assert is_valid_email("a@b.cc") 

37 assert is_valid_email("te.st+email.valid@a.org.au.xx.yy") 

38 assert is_valid_email("invalid@yahoo.co.uk") 

39 assert not is_valid_email("invalid@.yahoo.co.uk") 

40 assert not is_valid_email("test email@couchers.org") 

41 assert not is_valid_email(".testemail@couchers.org") 

42 assert not is_valid_email("testemail@couchersorg") 

43 assert not is_valid_email("b@xxb....blabla") 

44 

45 

46def test_is_valid_username(): 

47 assert is_valid_username("user") 

48 assert is_valid_username("us") 

49 assert is_valid_username("us_er") 

50 assert is_valid_username("us_er1") 

51 assert not is_valid_username("us_") 

52 assert not is_valid_username("u") 

53 assert not is_valid_username("1us") 

54 assert not is_valid_username("User") 

55 

56 

57def test_is_valid_name(): 

58 assert is_valid_name("a") 

59 assert is_valid_name("a b") 

60 assert is_valid_name("1") 

61 assert is_valid_name("老子") 

62 assert not is_valid_name(" ") 

63 assert not is_valid_name("") 

64 assert not is_valid_name(" ") 

65 

66 

67def test_parse_date(): 

68 assert parse_date("2020-01-01") is not None 

69 assert parse_date("1900-01-01") is not None 

70 assert parse_date("2099-01-01") is not None 

71 assert not parse_date("2019-02-29") 

72 assert not parse_date("2019-22-01") 

73 assert not parse_date("2020-1-01") 

74 assert not parse_date("20-01-01") 

75 assert not parse_date("01-01-2020") 

76 assert not parse_date("2020/01/01") 

77 

78 

79def test_get_parent_node_at_location(testing_communities): 

80 with session_scope() as session: 

81 w_id = get_community_id(session, "Global") # 0 to 100 

82 c1_id = get_community_id(session, "Country 1") # 0 to 50 

83 c1r1_id = get_community_id(session, "Country 1, Region 1") # 0 to 10 

84 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") # 0 to 5 

85 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") # 7 to 10 

86 c1r2_id = get_community_id(session, "Country 1, Region 2") # 20 to 25 

87 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") # 21 to 23 

88 c2_id = get_community_id(session, "Country 2") # 52 to 100 

89 c2r1_id = get_community_id(session, "Country 2, Region 1") # 52 to 71 

90 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") # 53 to 70 

91 

92 assert get_parent_node_at_location(session, create_1d_point(1)).id == c1r1c1_id 

93 assert get_parent_node_at_location(session, create_1d_point(3)).id == c1r1c1_id 

94 assert get_parent_node_at_location(session, create_1d_point(6)).id == c1r1_id 

95 assert get_parent_node_at_location(session, create_1d_point(8)).id == c1r1c2_id 

96 assert get_parent_node_at_location(session, create_1d_point(15)).id == c1_id 

97 assert get_parent_node_at_location(session, create_1d_point(51)).id == w_id 

98 

99 

100def pg_dump(): 

101 return subprocess.run( 

102 ["pg_dump", "-s", config["DATABASE_CONNECTION_STRING"]], stdout=subprocess.PIPE, encoding="ascii", check=True 

103 ).stdout 

104 

105 

106def sort_pg_dump_output(output): 

107 """Sorts the tables, functions and indices dumped by pg_dump in 

108 alphabetic order. Also sorts all lists enclosed with parentheses 

109 in alphabetic order. 

110 """ 

111 # Temporary replace newline with another character for easier 

112 # pattern matching. 

113 s = output.replace("\n", "§") 

114 

115 # Parameter lists are enclosed with parentheses and every entry 

116 # ends with a comma last on the line. 

117 s = re.sub(r" \(§(.*?)§\);", lambda m: " (§" + ",§".join(sorted(m.group(1).split(",§"))) + "§);", s) 

118 

119 # The header for all objects (tables, functions, indices, etc.) 

120 # seems to all start with two dashes and a space. We don't care 

121 # which kind of object it is here. 

122 s = "§-- ".join(sorted(s.split("§-- "))) 

123 

124 # Switch our temporary newline replacement to real newline. 

125 return s.replace("§", "\n") 

126 

127 

128def test_sort_pg_dump_output(): 

129 assert sort_pg_dump_output(" (\nb,\nc,\na\n);\n") == " (\na,\nb,\nc\n);\n" 

130 

131 

132def strip_leading_whitespace(lines): 

133 return [s.lstrip() for s in lines] 

134 

135 

136@pytest.fixture 

137def restore_database_after_testing_migrations(testconfig): 

138 """ 

139 Make sure the database exists for other tests after test_migrations runs. 

140 """ 

141 try: 

142 yield 

143 finally: 

144 recreate_database() 

145 

146 

147@pytest.mark.skipif(not run_migration_test(), reason="Migration test disabled") 

148def test_migrations(testconfig, restore_database_after_testing_migrations): 

149 """ 

150 This test will only run successfully if you have `pg_dump` installed and everything set up, which only happens if the 

151 test is being run within Gitlab CI where we do all that setup. So we disable it unless explicitly marked to run. 

152 

153 Compares the database schema built up from migrations, with the 

154 schema built by models.py. Both scenarios are started from an 

155 empty database, and dumped with pg_dump. Any unexplainable 

156 differences in the output are reported in unified diff format and 

157 fails the test. 

158 """ 

159 drop_database() 

160 # rebuild it with alembic migrations 

161 apply_migrations() 

162 

163 with_migrations = pg_dump() 

164 

165 drop_database() 

166 # create everything from the current models, not incrementally 

167 # through migrations 

168 create_schema_from_models() 

169 

170 from_scratch = pg_dump() 

171 

172 def message(s): 

173 s = sort_pg_dump_output(s) 

174 

175 # filter out alembic tables 

176 s = "\n-- ".join(x for x in s.split("\n-- ") if not x.startswith("Name: alembic_")) 

177 

178 # filter out \restrict and \unrestrict lines (Postgres 16+) 

179 s = "\n".join( 

180 line for line in s.splitlines() if not line.startswith("\\restrict") and not line.startswith("\\unrestrict") 

181 ) 

182 

183 return strip_leading_whitespace(s.splitlines()) 

184 

185 diff = "\n".join( 

186 difflib.unified_diff(message(with_migrations), message(from_scratch), fromfile="migrations", tofile="model") 

187 ) 

188 print(diff) 

189 success = diff == "" 

190 assert success 

191 

192 

193def test_slugify(db): 

194 with session_scope() as session: 

195 assert session.execute(func.slugify("this is a test")).scalar_one() == "this-is-a-test" 

196 assert session.execute(func.slugify("this is ä test")).scalar_one() == "this-is-a-test" 

197 # nothing here gets converted to ascci by unaccent, so it should be empty 

198 assert session.execute(func.slugify("Создай группу своего города")).scalar_one() == "slug" 

199 assert session.execute(func.slugify("Detta är ett test!")).scalar_one() == "detta-ar-ett-test" 

200 assert session.execute(func.slugify("@#(*$&!@#")).scalar_one() == "slug" 

201 assert ( 

202 session.execute( 

203 func.slugify("This has a lot ‒ at least relatively speaking ‒ of punctuation! :)") 

204 ).scalar_one() 

205 == "this-has-a-lot-at-least-relatively-speaking-of-punctuation" 

206 ) 

207 assert ( 

208 session.execute(func.slugify("Multiple - #@! - non-ascii chars")).scalar_one() == "multiple-non-ascii-chars" 

209 ) 

210 assert session.execute(func.slugify("123")).scalar_one() == "123" 

211 assert ( 

212 session.execute( 

213 func.slugify( 

214 "A sentence that is over 64 chars long and where the last thing would be replaced by a dash" 

215 ) 

216 ).scalar_one() 

217 == "a-sentence-that-is-over-64-chars-long-and-where-the-last-thing" 

218 )