Coverage for src/tests/test_db.py: 100%
106 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1import difflib
2import re
3import subprocess
5import pytest
6from sqlalchemy.sql import func
8from couchers.config import config
9from couchers.db import apply_migrations, get_parent_node_at_location, session_scope
10from couchers.utils import (
11 is_valid_email,
12 is_valid_name,
13 is_valid_user_id,
14 is_valid_username,
15 parse_date,
16)
17from tests.test_communities import create_1d_point, get_community_id, testing_communities # noqa
18from tests.test_fixtures import create_schema_from_models, db, drop_all, run_migration_test, testconfig # noqa
21def test_is_valid_user_id():
22 assert is_valid_user_id("10")
23 assert not is_valid_user_id("1a")
24 assert not is_valid_user_id("01")
27def test_is_valid_email():
28 assert is_valid_email("a@b.cc")
29 assert is_valid_email("te.st+email.valid@a.org.au.xx.yy")
30 assert is_valid_email("invalid@yahoo.co.uk")
31 assert not is_valid_email("invalid@.yahoo.co.uk")
32 assert not is_valid_email("test email@couchers.org")
33 assert not is_valid_email(".testemail@couchers.org")
34 assert not is_valid_email("testemail@couchersorg")
35 assert not is_valid_email("b@xxb....blabla")
38def test_is_valid_username():
39 assert is_valid_username("user")
40 assert is_valid_username("us")
41 assert is_valid_username("us_er")
42 assert is_valid_username("us_er1")
43 assert not is_valid_username("us_")
44 assert not is_valid_username("u")
45 assert not is_valid_username("1us")
46 assert not is_valid_username("User")
49def test_is_valid_name():
50 assert is_valid_name("a")
51 assert is_valid_name("a b")
52 assert is_valid_name("1")
53 assert is_valid_name("老子")
54 assert not is_valid_name(" ")
55 assert not is_valid_name("")
56 assert not is_valid_name(" ")
59def test_parse_date():
60 assert parse_date("2020-01-01") is not None
61 assert parse_date("1900-01-01") is not None
62 assert parse_date("2099-01-01") is not None
63 assert not parse_date("2019-02-29")
64 assert not parse_date("2019-22-01")
65 assert not parse_date("2020-1-01")
66 assert not parse_date("20-01-01")
67 assert not parse_date("01-01-2020")
68 assert not parse_date("2020/01/01")
71def test_get_parent_node_at_location(testing_communities):
72 with session_scope() as session:
73 w_id = get_community_id(session, "Global") # 0 to 100
74 c1_id = get_community_id(session, "Country 1") # 0 to 50
75 c1r1_id = get_community_id(session, "Country 1, Region 1") # 0 to 10
76 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") # 0 to 5
77 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") # 7 to 10
78 c1r2_id = get_community_id(session, "Country 1, Region 2") # 20 to 25
79 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") # 21 to 23
80 c2_id = get_community_id(session, "Country 2") # 52 to 100
81 c2r1_id = get_community_id(session, "Country 2, Region 1") # 52 to 71
82 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") # 53 to 70
84 assert get_parent_node_at_location(session, create_1d_point(1)).id == c1r1c1_id
85 assert get_parent_node_at_location(session, create_1d_point(3)).id == c1r1c1_id
86 assert get_parent_node_at_location(session, create_1d_point(6)).id == c1r1_id
87 assert get_parent_node_at_location(session, create_1d_point(8)).id == c1r1c2_id
88 assert get_parent_node_at_location(session, create_1d_point(15)).id == c1_id
89 assert get_parent_node_at_location(session, create_1d_point(51)).id == w_id
92def pg_dump():
93 return subprocess.run(
94 ["pg_dump", "-s", config["DATABASE_CONNECTION_STRING"]], stdout=subprocess.PIPE, encoding="ascii", check=True
95 ).stdout
98def sort_pg_dump_output(output):
99 """Sorts the tables, functions and indices dumped by pg_dump in
100 alphabetic order. Also sorts all lists enclosed with parentheses
101 in alphabetic order.
102 """
103 # Temporary replace newline with another character for easier
104 # pattern matching.
105 s = output.replace("\n", "§")
107 # Parameter lists are enclosed with parentheses and every entry
108 # ends with a comma last on the line.
109 s = re.sub(r" \(§(.*?)§\);", lambda m: " (§" + ",§".join(sorted(m.group(1).split(",§"))) + "§);", s)
111 # The header for all objects (tables, functions, indices, etc.)
112 # seems to all start with two dashes and a space. We don't care
113 # which kind of object it is here.
114 s = "§-- ".join(sorted(s.split("§-- ")))
116 # Switch our temporary newline replacement to real newline.
117 return s.replace("§", "\n")
120def test_sort_pg_dump_output():
121 assert sort_pg_dump_output(" (\nb,\nc,\na\n);\n") == " (\na,\nb,\nc\n);\n"
124def strip_leading_whitespace(lines):
125 return [s.lstrip() for s in lines]
128@pytest.mark.skipif(not run_migration_test(), reason="Migration test disabled")
129def test_migrations(testconfig):
130 """
131 This test will only run succesfully if you have `pg_dump` installed and everything set up, which only happens if the
132 test is being run within Gitlab CI where we do all that setup. So we disable it unless explicitly marked to run.
134 Compares the database schema built up from migrations, with the
135 schema built by models.py. Both scenarios are started from an
136 empty database, and dumped with pg_dump. Any unexplainable
137 differences in the output are reported in unified diff format and
138 fails the test.
139 """
140 drop_all()
141 # rebuild it with alembic migrations
142 apply_migrations()
144 with_migrations = pg_dump()
146 drop_all()
147 # create everything from the current models, not incrementally
148 # through migrations
149 create_schema_from_models()
151 from_scratch = pg_dump()
153 def message(s):
154 s = sort_pg_dump_output(s)
156 # filter out alembic tables
157 s = "\n-- ".join(x for x in s.split("\n-- ") if not x.startswith("Name: alembic_"))
159 return strip_leading_whitespace(s.splitlines())
161 diff = "\n".join(
162 difflib.unified_diff(message(with_migrations), message(from_scratch), fromfile="migrations", tofile="model")
163 )
164 print(diff)
165 success = diff == ""
166 assert success
169def test_slugify(db):
170 with session_scope() as session:
171 assert session.execute(func.slugify("this is a test")).scalar_one() == "this-is-a-test"
172 assert session.execute(func.slugify("this is ä test")).scalar_one() == "this-is-a-test"
173 # nothing here gets converted to ascci by unaccent, so it should be empty
174 assert session.execute(func.slugify("Создай группу своего города")).scalar_one() == "slug"
175 assert session.execute(func.slugify("Detta är ett test!")).scalar_one() == "detta-ar-ett-test"
176 assert session.execute(func.slugify("@#(*$&!@#")).scalar_one() == "slug"
177 assert (
178 session.execute(
179 func.slugify("This has a lot ‒ at least relatively speaking ‒ of punctuation! :)")
180 ).scalar_one()
181 == "this-has-a-lot-at-least-relatively-speaking-of-punctuation"
182 )
183 assert (
184 session.execute(func.slugify("Multiple - #@! - non-ascii chars")).scalar_one() == "multiple-non-ascii-chars"
185 )
186 assert session.execute(func.slugify("123")).scalar_one() == "123"
187 assert (
188 session.execute(
189 func.slugify(
190 "A sentence that is over 64 chars long and where the last thing would be replaced by a dash"
191 )
192 ).scalar_one()
193 == "a-sentence-that-is-over-64-chars-long-and-where-the-last-thing"
194 )