Coverage for src/tests/test_db.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

111 statements  

1import difflib 

2import re 

3import subprocess 

4 

5from sqlalchemy.sql import func 

6 

7from couchers.config import config 

8from couchers.db import apply_migrations, get_parent_node_at_location, session_scope 

9from couchers.sql import couchers_select as select 

10from couchers.utils import ( 

11 create_coordinate, 

12 get_coordinates, 

13 is_valid_email, 

14 is_valid_name, 

15 is_valid_user_id, 

16 is_valid_username, 

17 parse_date, 

18) 

19from tests.test_communities import create_1d_point, get_community_id, testing_communities # noqa 

20from tests.test_fixtures import create_schema_from_models, db, drop_all, testconfig # noqa 

21 

22 

23def test_is_valid_user_id(): 

24 assert is_valid_user_id("10") 

25 assert not is_valid_user_id("1a") 

26 assert not is_valid_user_id("01") 

27 

28 

29def test_is_valid_email(): 

30 assert is_valid_email("a@b.cc") 

31 assert is_valid_email("te.st+email.valid@a.org.au.xx.yy") 

32 assert is_valid_email("invalid@yahoo.co.uk") 

33 assert not is_valid_email("invalid@.yahoo.co.uk") 

34 assert not is_valid_email("test email@couchers.org") 

35 assert not is_valid_email(".testemail@couchers.org") 

36 assert not is_valid_email("testemail@couchersorg") 

37 assert not is_valid_email("b@xxb....blabla") 

38 

39 

40def test_is_valid_username(): 

41 assert is_valid_username("user") 

42 assert is_valid_username("us") 

43 assert is_valid_username("us_er") 

44 assert is_valid_username("us_er1") 

45 assert not is_valid_username("us_") 

46 assert not is_valid_username("u") 

47 assert not is_valid_username("1us") 

48 assert not is_valid_username("User") 

49 

50 

51def test_is_valid_name(): 

52 assert is_valid_name("a") 

53 assert is_valid_name("a b") 

54 assert is_valid_name("1") 

55 assert is_valid_name("老子") 

56 assert not is_valid_name(" ") 

57 assert not is_valid_name("") 

58 assert not is_valid_name(" ") 

59 

60 

61def test_parse_date(): 

62 assert parse_date("2020-01-01") is not None 

63 assert parse_date("1900-01-01") is not None 

64 assert parse_date("2099-01-01") is not None 

65 assert not parse_date("2019-02-29") 

66 assert not parse_date("2019-22-01") 

67 assert not parse_date("2020-1-01") 

68 assert not parse_date("20-01-01") 

69 assert not parse_date("01-01-2020") 

70 assert not parse_date("2020/01/01") 

71 

72 

73def test_get_parent_node_at_location(testing_communities): 

74 with session_scope() as session: 

75 w_id = get_community_id(session, "Global") # 0 to 100 

76 c1_id = get_community_id(session, "Country 1") # 0 to 50 

77 c1r1_id = get_community_id(session, "Country 1, Region 1") # 0 to 10 

78 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") # 0 to 5 

79 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") # 7 to 10 

80 c1r2_id = get_community_id(session, "Country 1, Region 2") # 20 to 25 

81 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") # 21 to 23 

82 c2_id = get_community_id(session, "Country 2") # 52 to 100 

83 c2r1_id = get_community_id(session, "Country 2, Region 1") # 52 to 71 

84 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") # 53 to 70 

85 

86 assert get_parent_node_at_location(session, create_1d_point(1)).id == c1r1c1_id 

87 assert get_parent_node_at_location(session, create_1d_point(3)).id == c1r1c1_id 

88 assert get_parent_node_at_location(session, create_1d_point(6)).id == c1r1_id 

89 assert get_parent_node_at_location(session, create_1d_point(8)).id == c1r1c2_id 

90 assert get_parent_node_at_location(session, create_1d_point(15)).id == c1_id 

91 assert get_parent_node_at_location(session, create_1d_point(51)).id == w_id 

92 

93 

94def test_create_coordinate(): 

95 test_coords = [ 

96 ((-95, -185), (-85, 175)), 

97 ((95, -180), (85, 180)), # Weird interaction in PostGIS where lng 

98 # flips at -180 only when there is latitude overflow 

99 ((90, -180), (90, -180)), 

100 ((20, 185), (20, -175)), 

101 ((0, 0), (0, 0)), 

102 ] 

103 

104 with session_scope() as session: 

105 for coords, coords_expected in test_coords: 

106 coords_wrapped = get_coordinates(session.execute(select(create_coordinate(*coords))).scalar_one()) 

107 

108 assert coords_wrapped == coords_expected 

109 

110 

111def pg_dump(): 

112 return subprocess.run( 

113 ["pg_dump", "-s", config["DATABASE_CONNECTION_STRING"]], stdout=subprocess.PIPE, encoding="ascii", check=True 

114 ).stdout 

115 

116 

117def sort_pg_dump_output(output): 

118 """Sorts the tables, functions and indices dumped by pg_dump in 

119 alphabetic order. Also sorts all lists enclosed with parentheses 

120 in alphabetic order. 

121 """ 

122 # Temporary replace newline with another character for easier 

123 # pattern matching. 

124 s = output.replace("\n", "§") 

125 

126 # Parameter lists are enclosed with parentheses and every entry 

127 # ends with a comma last on the line. 

128 s = re.sub(r" \(§(.*?)§\);", lambda m: " (§" + ",§".join(sorted(m.group(1).split(",§"))) + "§);", s) 

129 

130 # The header for all objects (tables, functions, indices, etc.) 

131 # seems to all start with two dashes and a space. We don't care 

132 # which kind of object it is here. 

133 s = "§-- ".join(sorted(s.split("§-- "))) 

134 

135 # Switch our temporary newline replacement to real newline. 

136 return s.replace("§", "\n") 

137 

138 

139def test_sort_pg_dump_output(): 

140 assert sort_pg_dump_output(" (\nb,\nc,\na\n);\n") == " (\na,\nb,\nc\n);\n" 

141 

142 

143def strip_leading_whitespace(lines): 

144 return [s.lstrip() for s in lines] 

145 

146 

147def test_migrations(testconfig): 

148 """Compares the database schema built up from migrations, with the 

149 schema built by models.py. Both scenarios are started from an 

150 empty database, and dumped with pg_dump. Any unexplainable 

151 differences in the output are reported in unified diff format and 

152 fails the test. 

153 """ 

154 drop_all() 

155 # rebuild it with alembic migrations 

156 apply_migrations() 

157 

158 with_migrations = pg_dump() 

159 

160 drop_all() 

161 # create everything from the current models, not incrementally 

162 # through migrations 

163 create_schema_from_models() 

164 

165 from_scratch = pg_dump() 

166 

167 def message(s): 

168 s = sort_pg_dump_output(s) 

169 

170 # filter out alembic tables 

171 s = "\n-- ".join(x for x in s.split("\n-- ") if not x.startswith("Name: alembic_")) 

172 

173 return strip_leading_whitespace(s.splitlines()) 

174 

175 diff = "\n".join( 

176 difflib.unified_diff(message(with_migrations), message(from_scratch), fromfile="migrations", tofile="model") 

177 ) 

178 print(diff) 

179 success = diff == "" 

180 assert success 

181 

182 

183def test_slugify(db): 

184 with session_scope() as session: 

185 assert session.execute(func.slugify("this is a test")).scalar_one() == "this-is-a-test" 

186 assert session.execute(func.slugify("this is ä test")).scalar_one() == "this-is-a-test" 

187 # nothing here gets converted to ascci by unaccent, so it should be empty 

188 assert session.execute(func.slugify("Создай группу своего города")).scalar_one() == "slug" 

189 assert session.execute(func.slugify("Detta är ett test!")).scalar_one() == "detta-ar-ett-test" 

190 assert session.execute(func.slugify("@#(*$&!@#")).scalar_one() == "slug" 

191 assert ( 

192 session.execute( 

193 func.slugify("This has a lot ‒ at least relatively speaking ‒ of punctuation! :)") 

194 ).scalar_one() 

195 == "this-has-a-lot-at-least-relatively-speaking-of-punctuation" 

196 ) 

197 assert ( 

198 session.execute(func.slugify("Multiple - #@! - non-ascii chars")).scalar_one() == "multiple-non-ascii-chars" 

199 ) 

200 assert session.execute(func.slugify("123")).scalar_one() == "123" 

201 assert ( 

202 session.execute( 

203 func.slugify( 

204 "A sentence that is over 64 chars long and where the last thing would be replaced by a dash" 

205 ) 

206 ).scalar_one() 

207 == "a-sentence-that-is-over-64-chars-long-and-where-the-last-thing" 

208 )