Coverage for src / tests / test_db.py: 99%
129 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-05 01:05 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-05 01:05 +0000
1import difflib
2import os
3import re
4import subprocess
5from pathlib import Path
7import pytest
8from google.protobuf import empty_pb2
9from sqlalchemy import select
10from sqlalchemy.sql import func
12from couchers.config import config
13from couchers.db import apply_migrations, get_parent_node_at_location, session_scope
14from couchers.jobs.handlers import DatabaseInconsistencyError, check_database_consistency
15from couchers.models import User
16from couchers.utils import (
17 is_valid_email,
18 is_valid_name,
19 is_valid_user_id,
20 is_valid_username,
21 parse_date,
22)
23from tests.fixtures.db import create_schema_from_models, drop_database, generate_user, run_migration_test
24from tests.test_communities import create_1d_point, get_community_id, testing_communities # noqa
27def test_is_valid_user_id():
28 assert is_valid_user_id("10")
29 assert not is_valid_user_id("1a")
30 assert not is_valid_user_id("01")
33def test_is_valid_email():
34 assert is_valid_email("a@b.cc")
35 assert is_valid_email("te.st+email.valid@a.org.au.xx.yy")
36 assert is_valid_email("invalid@yahoo.co.uk")
37 assert not is_valid_email("invalid@.yahoo.co.uk")
38 assert not is_valid_email("test email@couchers.org")
39 assert not is_valid_email(".testemail@couchers.org")
40 assert not is_valid_email("testemail@couchersorg")
41 assert not is_valid_email("b@xxb....blabla")
44def test_is_valid_username():
45 assert is_valid_username("user")
46 assert is_valid_username("us")
47 assert is_valid_username("us_er")
48 assert is_valid_username("us_er1")
49 assert not is_valid_username("us_")
50 assert not is_valid_username("u")
51 assert not is_valid_username("1us")
52 assert not is_valid_username("User")
55def test_is_valid_name():
56 assert is_valid_name("a")
57 assert is_valid_name("a b")
58 assert is_valid_name("1")
59 assert is_valid_name("老子")
60 assert not is_valid_name(" ")
61 assert not is_valid_name("")
62 assert not is_valid_name(" ")
65def test_parse_date():
66 assert parse_date("2020-01-01") is not None
67 assert parse_date("1900-01-01") is not None
68 assert parse_date("2099-01-01") is not None
69 assert not parse_date("2019-02-29")
70 assert not parse_date("2019-22-01")
71 assert not parse_date("2020-1-01")
72 assert not parse_date("20-01-01")
73 assert not parse_date("01-01-2020")
74 assert not parse_date("2020/01/01")
77def test_get_parent_node_at_location(testing_communities):
78 with session_scope() as session:
79 w_id = get_community_id(session, "Global") # 0 to 100
80 c1_id = get_community_id(session, "Country 1") # 0 to 50
81 c1r1_id = get_community_id(session, "Country 1, Region 1") # 0 to 10
82 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") # 0 to 5
83 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") # 7 to 10
84 c1r2_id = get_community_id(session, "Country 1, Region 2") # 20 to 25
85 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") # 21 to 23
86 c2_id = get_community_id(session, "Country 2") # 52 to 100
87 c2r1_id = get_community_id(session, "Country 2, Region 1") # 52 to 71
88 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") # 53 to 70
90 assert get_parent_node_at_location(session, create_1d_point(1)).id == c1r1c1_id
91 assert get_parent_node_at_location(session, create_1d_point(3)).id == c1r1c1_id
92 assert get_parent_node_at_location(session, create_1d_point(6)).id == c1r1_id
93 assert get_parent_node_at_location(session, create_1d_point(8)).id == c1r1c2_id
94 assert get_parent_node_at_location(session, create_1d_point(15)).id == c1_id
95 assert get_parent_node_at_location(session, create_1d_point(51)).id == w_id
98def pg_dump():
99 return subprocess.run(
100 ["pg_dump", "-s", config["DATABASE_CONNECTION_STRING"]], stdout=subprocess.PIPE, encoding="ascii", check=True
101 ).stdout
104def sort_pg_dump_output(output):
105 """Sorts the tables, functions and indices dumped by pg_dump in
106 alphabetic order. Also sorts all lists enclosed with parentheses
107 in alphabetic order.
108 """
109 # Temporary replace newline with another character for easier
110 # pattern matching.
111 s = output.replace("\n", "§")
113 # Parameter lists are enclosed with parentheses and every entry
114 # ends with a comma last on the line.
115 s = re.sub(r" \(§(.*?)§\);", lambda m: " (§" + ",§".join(sorted(m.group(1).split(",§"))) + "§);", s)
117 # The header for all objects (tables, functions, indices, etc.)
118 # seems to all start with two dashes and a space. We don't care
119 # which kind of object it is here.
120 s = "§-- ".join(sorted(s.split("§-- ")))
122 # Switch our temporary newline replacement to real newline.
123 return s.replace("§", "\n")
126def test_sort_pg_dump_output():
127 assert sort_pg_dump_output(" (\nb,\nc,\na\n);\n") == " (\na,\nb,\nc\n);\n"
130def strip_leading_whitespace(lines):
131 return [s.lstrip() for s in lines]
134@pytest.mark.skipif(not run_migration_test(), reason="Migration test disabled")
135def test_migrations(db, testconfig):
136 """
137 This test will only run successfully if you have `pg_dump` installed and everything set up, which only happens if the
138 test is being run within Gitlab CI where we do all that setup. So we disable it unless explicitly marked to run.
140 Compares the database schema built up from migrations, with the
141 schema built by models.py. Both scenarios are started from an
142 empty database, and dumped with pg_dump. Any unexplainable
143 differences in the output are reported in unified diff format and
144 fail the test.
145 """
146 drop_database()
147 # rebuild it with alembic migrations
148 apply_migrations()
150 with_migrations = pg_dump()
152 drop_database()
153 # create everything from the current models, not incrementally
154 # through migrations
155 create_schema_from_models()
157 from_scratch = pg_dump()
159 # Save the raw schemas to files for CI artifacts
160 schema_output_dir = os.environ.get("TEST_SCHEMA_OUTPUT_DIR")
161 if schema_output_dir: 161 ↛ 167line 161 didn't jump to line 167 because the condition on line 161 was always true
162 output_path = Path(schema_output_dir)
163 output_path.mkdir(parents=True, exist_ok=True)
164 (output_path / "schema_from_migrations.sql").write_text(with_migrations)
165 (output_path / "schema_from_models.sql").write_text(from_scratch)
167 def message(s):
168 s = sort_pg_dump_output(s)
170 # filter out alembic tables
171 s = "\n-- ".join(x for x in s.split("\n-- ") if not x.startswith("Name: alembic_"))
173 # filter out \restrict and \unrestrict lines (Postgres 16+)
174 s = "\n".join(
175 line for line in s.splitlines() if not line.startswith("\\restrict") and not line.startswith("\\unrestrict")
176 )
178 return strip_leading_whitespace(s.splitlines())
180 diff = "\n".join(
181 difflib.unified_diff(message(with_migrations), message(from_scratch), fromfile="migrations", tofile="model")
182 )
183 print(diff)
184 success = diff == ""
185 assert success
188def test_slugify(db):
189 with session_scope() as session:
190 assert session.execute(func.slugify("this is a test")).scalar_one() == "this-is-a-test"
191 assert session.execute(func.slugify("this is ä test")).scalar_one() == "this-is-a-test"
192 # nothing here gets converted to ascci by unaccent, so it should be empty
193 assert session.execute(func.slugify("Создай группу своего города")).scalar_one() == "slug"
194 assert session.execute(func.slugify("Detta är ett test!")).scalar_one() == "detta-ar-ett-test"
195 assert session.execute(func.slugify("@#(*$&!@#")).scalar_one() == "slug"
196 assert (
197 session.execute(
198 func.slugify("This has a lot ‒ at least relatively speaking ‒ of punctuation! :)")
199 ).scalar_one()
200 == "this-has-a-lot-at-least-relatively-speaking-of-punctuation"
201 )
202 assert (
203 session.execute(func.slugify("Multiple - #@! - non-ascii chars")).scalar_one() == "multiple-non-ascii-chars"
204 )
205 assert session.execute(func.slugify("123")).scalar_one() == "123"
206 assert (
207 session.execute(
208 func.slugify(
209 "A sentence that is over 64 chars long and where the last thing would be replaced by a dash"
210 )
211 ).scalar_one()
212 == "a-sentence-that-is-over-64-chars-long-and-where-the-last-thing"
213 )
216def test_database_consistency_check(db, testconfig):
217 """The database consistency check should pass with valid user/gallery setup"""
218 # Create a few users (which auto-creates their profile galleries)
219 generate_user()
220 generate_user()
221 generate_user()
223 # This should not raise any exceptions
224 check_database_consistency(empty_pb2.Empty())
226 # Now break consistency by removing a user's profile gallery
227 with session_scope() as session:
228 user = session.execute(select(User).where(User.is_deleted == False).limit(1)).scalar_one()
229 user.profile_gallery_id = None
231 # This should now raise an exception
232 with pytest.raises(DatabaseInconsistencyError):
233 check_database_consistency(empty_pb2.Empty())