Coverage for src/tests/test_db.py: 100%

106 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1import difflib 

2import re 

3import subprocess 

4 

5import pytest 

6from sqlalchemy.sql import func 

7 

8from couchers.config import config 

9from couchers.db import apply_migrations, get_parent_node_at_location, session_scope 

10from couchers.utils import ( 

11 is_valid_email, 

12 is_valid_name, 

13 is_valid_user_id, 

14 is_valid_username, 

15 parse_date, 

16) 

17from tests.test_communities import create_1d_point, get_community_id, testing_communities # noqa 

18from tests.test_fixtures import create_schema_from_models, db, drop_all, run_migration_test, testconfig # noqa 

19 

20 

21def test_is_valid_user_id(): 

22 assert is_valid_user_id("10") 

23 assert not is_valid_user_id("1a") 

24 assert not is_valid_user_id("01") 

25 

26 

27def test_is_valid_email(): 

28 assert is_valid_email("a@b.cc") 

29 assert is_valid_email("te.st+email.valid@a.org.au.xx.yy") 

30 assert is_valid_email("invalid@yahoo.co.uk") 

31 assert not is_valid_email("invalid@.yahoo.co.uk") 

32 assert not is_valid_email("test email@couchers.org") 

33 assert not is_valid_email(".testemail@couchers.org") 

34 assert not is_valid_email("testemail@couchersorg") 

35 assert not is_valid_email("b@xxb....blabla") 

36 

37 

38def test_is_valid_username(): 

39 assert is_valid_username("user") 

40 assert is_valid_username("us") 

41 assert is_valid_username("us_er") 

42 assert is_valid_username("us_er1") 

43 assert not is_valid_username("us_") 

44 assert not is_valid_username("u") 

45 assert not is_valid_username("1us") 

46 assert not is_valid_username("User") 

47 

48 

49def test_is_valid_name(): 

50 assert is_valid_name("a") 

51 assert is_valid_name("a b") 

52 assert is_valid_name("1") 

53 assert is_valid_name("老子") 

54 assert not is_valid_name(" ") 

55 assert not is_valid_name("") 

56 assert not is_valid_name(" ") 

57 

58 

59def test_parse_date(): 

60 assert parse_date("2020-01-01") is not None 

61 assert parse_date("1900-01-01") is not None 

62 assert parse_date("2099-01-01") is not None 

63 assert not parse_date("2019-02-29") 

64 assert not parse_date("2019-22-01") 

65 assert not parse_date("2020-1-01") 

66 assert not parse_date("20-01-01") 

67 assert not parse_date("01-01-2020") 

68 assert not parse_date("2020/01/01") 

69 

70 

71def test_get_parent_node_at_location(testing_communities): 

72 with session_scope() as session: 

73 w_id = get_community_id(session, "Global") # 0 to 100 

74 c1_id = get_community_id(session, "Country 1") # 0 to 50 

75 c1r1_id = get_community_id(session, "Country 1, Region 1") # 0 to 10 

76 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") # 0 to 5 

77 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") # 7 to 10 

78 c1r2_id = get_community_id(session, "Country 1, Region 2") # 20 to 25 

79 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") # 21 to 23 

80 c2_id = get_community_id(session, "Country 2") # 52 to 100 

81 c2r1_id = get_community_id(session, "Country 2, Region 1") # 52 to 71 

82 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") # 53 to 70 

83 

84 assert get_parent_node_at_location(session, create_1d_point(1)).id == c1r1c1_id 

85 assert get_parent_node_at_location(session, create_1d_point(3)).id == c1r1c1_id 

86 assert get_parent_node_at_location(session, create_1d_point(6)).id == c1r1_id 

87 assert get_parent_node_at_location(session, create_1d_point(8)).id == c1r1c2_id 

88 assert get_parent_node_at_location(session, create_1d_point(15)).id == c1_id 

89 assert get_parent_node_at_location(session, create_1d_point(51)).id == w_id 

90 

91 

92def pg_dump(): 

93 return subprocess.run( 

94 ["pg_dump", "-s", config["DATABASE_CONNECTION_STRING"]], stdout=subprocess.PIPE, encoding="ascii", check=True 

95 ).stdout 

96 

97 

98def sort_pg_dump_output(output): 

99 """Sorts the tables, functions and indices dumped by pg_dump in 

100 alphabetic order. Also sorts all lists enclosed with parentheses 

101 in alphabetic order. 

102 """ 

103 # Temporary replace newline with another character for easier 

104 # pattern matching. 

105 s = output.replace("\n", "§") 

106 

107 # Parameter lists are enclosed with parentheses and every entry 

108 # ends with a comma last on the line. 

109 s = re.sub(r" \(§(.*?)§\);", lambda m: " (§" + ",§".join(sorted(m.group(1).split(",§"))) + "§);", s) 

110 

111 # The header for all objects (tables, functions, indices, etc.) 

112 # seems to all start with two dashes and a space. We don't care 

113 # which kind of object it is here. 

114 s = "§-- ".join(sorted(s.split("§-- "))) 

115 

116 # Switch our temporary newline replacement to real newline. 

117 return s.replace("§", "\n") 

118 

119 

120def test_sort_pg_dump_output(): 

121 assert sort_pg_dump_output(" (\nb,\nc,\na\n);\n") == " (\na,\nb,\nc\n);\n" 

122 

123 

124def strip_leading_whitespace(lines): 

125 return [s.lstrip() for s in lines] 

126 

127 

128@pytest.mark.skipif(not run_migration_test(), reason="Migration test disabled") 

129def test_migrations(testconfig): 

130 """ 

131 This test will only run succesfully if you have `pg_dump` installed and everything set up, which only happens if the 

132 test is being run within Gitlab CI where we do all that setup. So we disable it unless explicitly marked to run. 

133 

134 Compares the database schema built up from migrations, with the 

135 schema built by models.py. Both scenarios are started from an 

136 empty database, and dumped with pg_dump. Any unexplainable 

137 differences in the output are reported in unified diff format and 

138 fails the test. 

139 """ 

140 drop_all() 

141 # rebuild it with alembic migrations 

142 apply_migrations() 

143 

144 with_migrations = pg_dump() 

145 

146 drop_all() 

147 # create everything from the current models, not incrementally 

148 # through migrations 

149 create_schema_from_models() 

150 

151 from_scratch = pg_dump() 

152 

153 def message(s): 

154 s = sort_pg_dump_output(s) 

155 

156 # filter out alembic tables 

157 s = "\n-- ".join(x for x in s.split("\n-- ") if not x.startswith("Name: alembic_")) 

158 

159 return strip_leading_whitespace(s.splitlines()) 

160 

161 diff = "\n".join( 

162 difflib.unified_diff(message(with_migrations), message(from_scratch), fromfile="migrations", tofile="model") 

163 ) 

164 print(diff) 

165 success = diff == "" 

166 assert success 

167 

168 

169def test_slugify(db): 

170 with session_scope() as session: 

171 assert session.execute(func.slugify("this is a test")).scalar_one() == "this-is-a-test" 

172 assert session.execute(func.slugify("this is ä test")).scalar_one() == "this-is-a-test" 

173 # nothing here gets converted to ascci by unaccent, so it should be empty 

174 assert session.execute(func.slugify("Создай группу своего города")).scalar_one() == "slug" 

175 assert session.execute(func.slugify("Detta är ett test!")).scalar_one() == "detta-ar-ett-test" 

176 assert session.execute(func.slugify("@#(*$&!@#")).scalar_one() == "slug" 

177 assert ( 

178 session.execute( 

179 func.slugify("This has a lot ‒ at least relatively speaking ‒ of punctuation! :)") 

180 ).scalar_one() 

181 == "this-has-a-lot-at-least-relatively-speaking-of-punctuation" 

182 ) 

183 assert ( 

184 session.execute(func.slugify("Multiple - #@! - non-ascii chars")).scalar_one() == "multiple-non-ascii-chars" 

185 ) 

186 assert session.execute(func.slugify("123")).scalar_one() == "123" 

187 assert ( 

188 session.execute( 

189 func.slugify( 

190 "A sentence that is over 64 chars long and where the last thing would be replaced by a dash" 

191 ) 

192 ).scalar_one() 

193 == "a-sentence-that-is-over-64-chars-long-and-where-the-last-thing" 

194 )