Coverage for src/couchers/models/verification.py: 100%

69 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 00:20 +0000

1import enum 

2 

3from sqlalchemy import ( 

4 BigInteger, 

5 Boolean, 

6 CheckConstraint, 

7 Column, 

8 Date, 

9 DateTime, 

10 Enum, 

11 ForeignKey, 

12 Index, 

13 String, 

14 func, 

15) 

16from sqlalchemy import LargeBinary as Binary 

17from sqlalchemy.ext.hybrid import hybrid_method, hybrid_property 

18from sqlalchemy.orm import column_property, relationship 

19 

20from couchers.models.base import Base 

21from couchers.utils import date_in_timezone, now 

22 

23 

24class StrongVerificationAttemptStatus(enum.Enum): 

25 ## full data states 

26 # completed, this now provides verification for a user 

27 succeeded = enum.auto() 

28 

29 ## no data states 

30 # in progress: waiting for the user to scan the Iris code or open the app 

31 in_progress_waiting_on_user_to_open_app = enum.auto() 

32 # in progress: waiting for the user to scan MRZ or NFC/chip 

33 in_progress_waiting_on_user_in_app = enum.auto() 

34 # in progress, waiting for backend to pull verification data 

35 in_progress_waiting_on_backend = enum.auto() 

36 # failed, no data 

37 failed = enum.auto() 

38 

39 # duplicate, at our end, has data 

40 duplicate = enum.auto() 

41 

42 ## minimal data states 

43 # the data, except minimal deduplication data, was deleted 

44 deleted = enum.auto() 

45 

46 

47class PassportSex(enum.Enum): 

48 """ 

49 We don't care about sex, we use gender on the platform. But passports apparently do. 

50 """ 

51 

52 male = enum.auto() 

53 female = enum.auto() 

54 unspecified = enum.auto() 

55 

56 

57class StrongVerificationAttempt(Base): 

58 """ 

59 An attempt to perform strong verification 

60 """ 

61 

62 __tablename__ = "strong_verification_attempts" 

63 

64 # our verification id 

65 id = Column(BigInteger, primary_key=True) 

66 

67 # this is returned in the callback, and we look up the attempt via this 

68 verification_attempt_token = Column(String, nullable=False, unique=True) 

69 

70 user_id = Column(ForeignKey("users.id"), nullable=False, index=True) 

71 created = Column(DateTime(timezone=True), nullable=False, server_default=func.now()) 

72 

73 status = Column( 

74 Enum(StrongVerificationAttemptStatus), 

75 nullable=False, 

76 default=StrongVerificationAttemptStatus.in_progress_waiting_on_user_to_open_app, 

77 ) 

78 

79 ## full data 

80 has_full_data = Column(Boolean, nullable=False, default=False) 

81 # the data returned from iris, encrypted with a public key whose private key is kept offline 

82 passport_encrypted_data = Column(Binary, nullable=True) 

83 passport_date_of_birth = Column(Date, nullable=True) 

84 passport_sex = Column(Enum(PassportSex), nullable=True) 

85 

86 ## minimal data: this will not be deleted 

87 has_minimal_data = Column(Boolean, nullable=False, default=False) 

88 passport_expiry_date = Column(Date, nullable=True) 

89 passport_nationality = Column(String, nullable=True) 

90 # last three characters of the passport number 

91 passport_last_three_document_chars = Column(String, nullable=True) 

92 

93 iris_token = Column(String, nullable=False, unique=True) 

94 iris_session_id = Column(BigInteger, nullable=False, unique=True) 

95 

96 passport_expiry_datetime = column_property(date_in_timezone(passport_expiry_date, "Etc/UTC")) 

97 

98 user = relationship("User") 

99 

100 @hybrid_property 

101 def is_valid(self): 

102 """ 

103 This only checks whether the attempt is a success and the passport is not expired, use `has_strong_verification` for full check 

104 """ 

105 return (self.status == StrongVerificationAttemptStatus.succeeded) and (self.passport_expiry_datetime >= now()) 

106 

107 @is_valid.expression 

108 def is_valid(cls): 

109 return (cls.status == StrongVerificationAttemptStatus.succeeded) & ( 

110 func.coalesce(cls.passport_expiry_datetime >= func.now(), False) 

111 ) 

112 

113 @hybrid_property 

114 def is_visible(self): 

115 return self.status != StrongVerificationAttemptStatus.deleted 

116 

117 @hybrid_method 

118 def _raw_birthdate_match(self, user): 

119 """Does not check whether the SV attempt itself is not expired""" 

120 return self.passport_date_of_birth == user.birthdate 

121 

122 @hybrid_method 

123 def matches_birthdate(self, user): 

124 return self.is_valid & self._raw_birthdate_match(user) 

125 

126 @hybrid_method 

127 def _raw_gender_match(self, user): 

128 """Does not check whether the SV attempt itself is not expired""" 

129 return ( 

130 ((user.gender == "Woman") & (self.passport_sex == PassportSex.female)) 

131 | ((user.gender == "Man") & (self.passport_sex == PassportSex.male)) 

132 | (self.passport_sex == PassportSex.unspecified) 

133 | (user.has_passport_sex_gender_exception == True) 

134 ) 

135 

136 @hybrid_method 

137 def matches_gender(self, user): 

138 return self.is_valid & self._raw_gender_match(user) 

139 

140 @hybrid_method 

141 def has_strong_verification(self, user): 

142 return self.is_valid & self._raw_birthdate_match(user) & self._raw_gender_match(user) 

143 

144 __table_args__ = ( 

145 # used to look up verification status for a user 

146 Index( 

147 "ix_strong_verification_attempts_current", 

148 user_id, 

149 passport_expiry_date, 

150 postgresql_where=status == StrongVerificationAttemptStatus.succeeded, 

151 ), 

152 # each passport can be verified only once 

153 Index( 

154 "ix_strong_verification_attempts_unique_succeeded", 

155 passport_expiry_date, 

156 passport_nationality, 

157 passport_last_three_document_chars, 

158 unique=True, 

159 postgresql_where=( 

160 (status == StrongVerificationAttemptStatus.succeeded) 

161 | (status == StrongVerificationAttemptStatus.deleted) 

162 ), 

163 ), 

164 # full data check 

165 CheckConstraint( 

166 "(has_full_data IS TRUE AND passport_encrypted_data IS NOT NULL AND passport_date_of_birth IS NOT NULL) OR \ 

167 (has_full_data IS FALSE AND passport_encrypted_data IS NULL AND passport_date_of_birth IS NULL)", 

168 name="full_data_status", 

169 ), 

170 # minimal data check 

171 CheckConstraint( 

172 "(has_minimal_data IS TRUE AND passport_expiry_date IS NOT NULL AND passport_nationality IS NOT NULL AND passport_last_three_document_chars IS NOT NULL) OR \ 

173 (has_minimal_data IS FALSE AND passport_expiry_date IS NULL AND passport_nationality IS NULL AND passport_last_three_document_chars IS NULL)", 

174 name="minimal_data_status", 

175 ), 

176 # note on implications: p => q iff ~p OR q 

177 # full data implies minimal data, has_minimal_data => has_full_data 

178 CheckConstraint( 

179 "(has_full_data IS FALSE) OR (has_minimal_data IS TRUE)", 

180 name="full_data_implies_minimal_data", 

181 ), 

182 # succeeded implies full data 

183 CheckConstraint( 

184 "(NOT (status = 'succeeded')) OR (has_full_data IS TRUE)", 

185 name="succeeded_implies_full_data", 

186 ), 

187 # in_progress/failed implies no_data 

188 CheckConstraint( 

189 "(NOT ((status = 'in_progress_waiting_on_user_to_open_app') OR (status = 'in_progress_waiting_on_user_in_app') OR (status = 'in_progress_waiting_on_backend') OR (status = 'failed'))) OR (has_minimal_data IS FALSE)", 

190 name="in_progress_failed_iris_implies_no_data", 

191 ), 

192 # deleted or duplicate implies minimal data 

193 CheckConstraint( 

194 "(NOT ((status = 'deleted') OR (status = 'duplicate'))) OR (has_minimal_data IS TRUE)", 

195 name="deleted_duplicate_implies_minimal_data", 

196 ), 

197 ) 

198 

199 

200class StrongVerificationCallbackEvent(Base): 

201 __tablename__ = "strong_verification_callback_events" 

202 

203 id = Column(BigInteger, primary_key=True) 

204 created = Column(DateTime(timezone=True), nullable=False, server_default=func.now()) 

205 

206 verification_attempt_id = Column(ForeignKey("strong_verification_attempts.id"), nullable=False, index=True) 

207 

208 iris_status = Column(String, nullable=False)