Coverage for src/tests/test_db.py: 100%
111 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
1import difflib
2import re
3import subprocess
5from sqlalchemy.sql import func
7from couchers.config import config
8from couchers.db import apply_migrations, get_parent_node_at_location, session_scope
9from couchers.sql import couchers_select as select
10from couchers.utils import (
11 create_coordinate,
12 get_coordinates,
13 is_valid_email,
14 is_valid_name,
15 is_valid_user_id,
16 is_valid_username,
17 parse_date,
18)
19from tests.test_communities import create_1d_point, get_community_id, testing_communities # noqa
20from tests.test_fixtures import create_schema_from_models, db, drop_all, testconfig # noqa
23def test_is_valid_user_id():
24 assert is_valid_user_id("10")
25 assert not is_valid_user_id("1a")
26 assert not is_valid_user_id("01")
29def test_is_valid_email():
30 assert is_valid_email("a@b.cc")
31 assert is_valid_email("te.st+email.valid@a.org.au.xx.yy")
32 assert is_valid_email("invalid@yahoo.co.uk")
33 assert not is_valid_email("invalid@.yahoo.co.uk")
34 assert not is_valid_email("test email@couchers.org")
35 assert not is_valid_email(".testemail@couchers.org")
36 assert not is_valid_email("testemail@couchersorg")
37 assert not is_valid_email("b@xxb....blabla")
40def test_is_valid_username():
41 assert is_valid_username("user")
42 assert is_valid_username("us")
43 assert is_valid_username("us_er")
44 assert is_valid_username("us_er1")
45 assert not is_valid_username("us_")
46 assert not is_valid_username("u")
47 assert not is_valid_username("1us")
48 assert not is_valid_username("User")
51def test_is_valid_name():
52 assert is_valid_name("a")
53 assert is_valid_name("a b")
54 assert is_valid_name("1")
55 assert is_valid_name("老子")
56 assert not is_valid_name(" ")
57 assert not is_valid_name("")
58 assert not is_valid_name(" ")
61def test_parse_date():
62 assert parse_date("2020-01-01") is not None
63 assert parse_date("1900-01-01") is not None
64 assert parse_date("2099-01-01") is not None
65 assert not parse_date("2019-02-29")
66 assert not parse_date("2019-22-01")
67 assert not parse_date("2020-1-01")
68 assert not parse_date("20-01-01")
69 assert not parse_date("01-01-2020")
70 assert not parse_date("2020/01/01")
73def test_get_parent_node_at_location(testing_communities):
74 with session_scope() as session:
75 w_id = get_community_id(session, "Global") # 0 to 100
76 c1_id = get_community_id(session, "Country 1") # 0 to 50
77 c1r1_id = get_community_id(session, "Country 1, Region 1") # 0 to 10
78 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") # 0 to 5
79 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") # 7 to 10
80 c1r2_id = get_community_id(session, "Country 1, Region 2") # 20 to 25
81 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") # 21 to 23
82 c2_id = get_community_id(session, "Country 2") # 52 to 100
83 c2r1_id = get_community_id(session, "Country 2, Region 1") # 52 to 71
84 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") # 53 to 70
86 assert get_parent_node_at_location(session, create_1d_point(1)).id == c1r1c1_id
87 assert get_parent_node_at_location(session, create_1d_point(3)).id == c1r1c1_id
88 assert get_parent_node_at_location(session, create_1d_point(6)).id == c1r1_id
89 assert get_parent_node_at_location(session, create_1d_point(8)).id == c1r1c2_id
90 assert get_parent_node_at_location(session, create_1d_point(15)).id == c1_id
91 assert get_parent_node_at_location(session, create_1d_point(51)).id == w_id
94def test_create_coordinate():
95 test_coords = [
96 ((-95, -185), (-85, 175)),
97 ((95, -180), (85, 180)), # Weird interaction in PostGIS where lng
98 # flips at -180 only when there is latitude overflow
99 ((90, -180), (90, -180)),
100 ((20, 185), (20, -175)),
101 ((0, 0), (0, 0)),
102 ]
104 with session_scope() as session:
105 for coords, coords_expected in test_coords:
106 coords_wrapped = get_coordinates(session.execute(select(create_coordinate(*coords))).scalar_one())
108 assert coords_wrapped == coords_expected
111def pg_dump():
112 return subprocess.run(
113 ["pg_dump", "-s", config["DATABASE_CONNECTION_STRING"]], stdout=subprocess.PIPE, encoding="ascii", check=True
114 ).stdout
117def sort_pg_dump_output(output):
118 """Sorts the tables, functions and indices dumped by pg_dump in
119 alphabetic order. Also sorts all lists enclosed with parentheses
120 in alphabetic order.
121 """
122 # Temporary replace newline with another character for easier
123 # pattern matching.
124 s = output.replace("\n", "§")
126 # Parameter lists are enclosed with parentheses and every entry
127 # ends with a comma last on the line.
128 s = re.sub(r" \(§(.*?)§\);", lambda m: " (§" + ",§".join(sorted(m.group(1).split(",§"))) + "§);", s)
130 # The header for all objects (tables, functions, indices, etc.)
131 # seems to all start with two dashes and a space. We don't care
132 # which kind of object it is here.
133 s = "§-- ".join(sorted(s.split("§-- ")))
135 # Switch our temporary newline replacement to real newline.
136 return s.replace("§", "\n")
139def test_sort_pg_dump_output():
140 assert sort_pg_dump_output(" (\nb,\nc,\na\n);\n") == " (\na,\nb,\nc\n);\n"
143def strip_leading_whitespace(lines):
144 return [s.lstrip() for s in lines]
147def test_migrations(testconfig):
148 """Compares the database schema built up from migrations, with the
149 schema built by models.py. Both scenarios are started from an
150 empty database, and dumped with pg_dump. Any unexplainable
151 differences in the output are reported in unified diff format and
152 fails the test.
153 """
154 drop_all()
155 # rebuild it with alembic migrations
156 apply_migrations()
158 with_migrations = pg_dump()
160 drop_all()
161 # create everything from the current models, not incrementally
162 # through migrations
163 create_schema_from_models()
165 from_scratch = pg_dump()
167 def message(s):
168 s = sort_pg_dump_output(s)
170 # filter out alembic tables
171 s = "\n-- ".join(x for x in s.split("\n-- ") if not x.startswith("Name: alembic_"))
173 return strip_leading_whitespace(s.splitlines())
175 diff = "\n".join(
176 difflib.unified_diff(message(with_migrations), message(from_scratch), fromfile="migrations", tofile="model")
177 )
178 print(diff)
179 success = diff == ""
180 assert success
183def test_slugify(db):
184 with session_scope() as session:
185 assert session.execute(func.slugify("this is a test")).scalar_one() == "this-is-a-test"
186 assert session.execute(func.slugify("this is ä test")).scalar_one() == "this-is-a-test"
187 # nothing here gets converted to ascci by unaccent, so it should be empty
188 assert session.execute(func.slugify("Создай группу своего города")).scalar_one() == "slug"
189 assert session.execute(func.slugify("Detta är ett test!")).scalar_one() == "detta-ar-ett-test"
190 assert session.execute(func.slugify("@#(*$&!@#")).scalar_one() == "slug"
191 assert (
192 session.execute(
193 func.slugify("This has a lot ‒ at least relatively speaking ‒ of punctuation! :)")
194 ).scalar_one()
195 == "this-has-a-lot-at-least-relatively-speaking-of-punctuation"
196 )
197 assert (
198 session.execute(func.slugify("Multiple - #@! - non-ascii chars")).scalar_one() == "multiple-non-ascii-chars"
199 )
200 assert session.execute(func.slugify("123")).scalar_one() == "123"
201 assert (
202 session.execute(
203 func.slugify(
204 "A sentence that is over 64 chars long and where the last thing would be replaced by a dash"
205 )
206 ).scalar_one()
207 == "a-sentence-that-is-over-64-chars-long-and-where-the-last-thing"
208 )