Coverage for src/couchers/notifications/background.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

58 statements  

1import logging 

2from datetime import timedelta 

3from typing import List 

4 

5from sqlalchemy.sql import and_, func 

6 

7from couchers.db import session_scope 

8from couchers.models import ( 

9 Notification, 

10 NotificationDelivery, 

11 NotificationDeliveryType, 

12 NotificationPreference, 

13 NotificationTopicAction, 

14 User, 

15) 

16from couchers.sql import couchers_select as select 

17from couchers.tasks import send_digest_email, send_notification_email 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22def get_notification_preference( 

23 session, user_id: int, topic_action: NotificationTopicAction 

24) -> List[NotificationDeliveryType]: 

25 """ 

26 Gets the user's preference from the DB or otherwise falls back to defaults 

27 

28 Must be done in session scope 

29 

30 Returns list of delivery types 

31 """ 

32 overrides = { 

33 res.delivery_type: res.deliver 

34 for res in session.execute( 

35 select(NotificationPreference) 

36 .where(NotificationPreference.id == user_id) 

37 .where(NotificationPreference.topic_action == topic_action) 

38 ) 

39 .scalars() 

40 .all() 

41 } 

42 return [dt for dt in NotificationDeliveryType if overrides.get(dt, dt in topic_action.defaults)] 

43 

44 

45def handle_notification(notification_id): 

46 with session_scope() as session: 

47 notification = session.execute(select(Notification).where(Notification.id == notification_id)).scalar_one() 

48 

49 # ignore this notification if the user hasn't enabled new notifications 

50 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one() 

51 if not user.new_notifications_enabled: 

52 logger.info(f"Skipping notification for {user} due to new notifications disabled") 

53 return 

54 

55 topic, action = notification.topic_action.unpack() 

56 delivery_types = get_notification_preference(session, notification.user.id, notification.topic_action) 

57 for delivery_type in delivery_types: 

58 logger.info(f"Should notify by {delivery_type}") 

59 if delivery_type == NotificationDeliveryType.email: 

60 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication 

61 session.add( 

62 NotificationDelivery( 

63 notification_id=notification.id, 

64 delivered=None, 

65 delivery_type=NotificationDeliveryType.email, 

66 ) 

67 ) 

68 elif delivery_type == NotificationDeliveryType.push: 

69 # for push notifications, we send them straight away 

70 session.add( 

71 NotificationDelivery( 

72 notification_id=notification.id, 

73 delivered=func.now(), 

74 delivery_type=NotificationDeliveryType.push, 

75 ) 

76 ) 

77 # todo 

78 logger.info("Supposed to send push notification") 

79 

80 

81def handle_email_notifications(): 

82 """ 

83 Sends out emails for notifications 

84 """ 

85 logger.info(f"Sending out email notifications") 

86 

87 with session_scope() as session: 

88 # delivered email notifications: we don't want to send emails for these 

89 subquery = ( 

90 select(Notification.user_id, Notification.topic_action, Notification.key) 

91 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

92 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email) 

93 .where(NotificationDelivery.delivered != None) 

94 .where(Notification.created > func.now() - timedelta(hours=24)) 

95 .group_by(Notification.user_id, Notification.topic_action, Notification.key) 

96 .subquery() 

97 ) 

98 

99 email_notifications_to_send = session.execute( 

100 ( 

101 select( 

102 User, 

103 Notification.topic_action, 

104 Notification.key, 

105 func.min(Notification.id).label("notification_id"), 

106 func.min(NotificationDelivery.id).label("notification_delivery_id"), 

107 ) 

108 .join(User, User.id == Notification.user_id) 

109 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

110 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email) 

111 .where(Notification.created > func.now() - timedelta(hours=1)) 

112 .group_by(User, Notification.user_id, Notification.topic_action, Notification.key) 

113 # pick the notifications that haven't been delivered 

114 .outerjoin( 

115 subquery, 

116 and_( 

117 and_( 

118 subquery.c.user_id == Notification.user_id, 

119 subquery.c.topic_action == Notification.topic_action, 

120 ), 

121 subquery.c.key == Notification.key, 

122 ), 

123 ) 

124 .where(subquery.c.key == None) 

125 ) 

126 ).all() 

127 

128 for user, topic_action, key, notification_id, notification_delivery_id in email_notifications_to_send: 

129 topic, action = topic_action.unpack() 

130 logger.info(f"Sending notification id {notification_id} to {user.id} ({topic}/{action}/{key})") 

131 notification_delivery = session.execute( 

132 select(NotificationDelivery).where(NotificationDelivery.id == notification_delivery_id) 

133 ).scalar_one() 

134 assert notification_delivery.delivery_type == NotificationDeliveryType.email 

135 assert not notification_delivery.delivered 

136 assert notification_delivery.notification_id == notification_id 

137 send_notification_email(notification_delivery.notification) 

138 notification_delivery.delivered = func.now() 

139 session.commit() 

140 

141 

142def handle_email_digests(): 

143 """ 

144 Sends out email digests 

145 """ 

146 logger.info(f"Sending out email digests") 

147 

148 with session_scope() as session: 

149 users_to_send_digests_to = session.execute( 

150 ( 

151 select(User) 

152 .where(User.last_digest_sent < func.now() - timedelta(hours=24)) 

153 # todo: tz 

154 .join(Notification, Notification.user_id == User.id) 

155 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

156 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

157 .where(NotificationDelivery.delivered == None) 

158 .group_by(User) 

159 ) 

160 ).all() 

161 

162 for user in users_to_send_digests_to: 

163 # already sent notifications 

164 subquery = ( 

165 select( 

166 Notification.id.label("notification_id"), 

167 func.min(NotificationDelivery.id).label("notification_delivery_id"), 

168 ) 

169 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

170 .where(NotificationDelivery.delivered == True) 

171 .where(Notification.user_id == user.id) 

172 .group_by(Notification) 

173 .subquery() 

174 ) 

175 

176 # notifications that haven't been delivered in any way yet 

177 notifications_and_deliveries = session.execute( 

178 ( 

179 select(Notification, NotificationDelivery) 

180 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

181 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

182 .outerjoin(subquery, subquery.c.notification_id == Notification.id) 

183 .where(subquery.c.notification_delivery_id == None) 

184 .where(Notification.user_id == user.id) 

185 .order_by(Notification.created) 

186 ) 

187 ).all() 

188 

189 if notifications_and_deliveries: 

190 notifications, deliveries = zip(*notifications_and_deliveries) 

191 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications") 

192 send_digest_email(notifications) 

193 for delivery in deliveries: 

194 delivery.delivered = func.now() 

195 user.last_digest_sent = func.now() 

196 session.commit()