Coverage for src/couchers/email/__init__.py: 100%
24 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1import logging
2from pathlib import Path
4import yaml
5from jinja2 import Environment, FileSystemLoader
7from couchers.config import config
8from couchers.jobs.enqueue import queue_job
9from couchers.metrics import emails_counter
10from proto.internal import jobs_pb2
12logger = logging.getLogger(__name__)
14loader = FileSystemLoader(Path(__file__).parent / ".." / ".." / ".." / "templates")
15env = Environment(loader=loader, trim_blocks=True)
18def _queue_email(
19 session, sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data
20):
21 payload = jobs_pb2.SendEmailPayload(
22 sender_name=sender_name,
23 sender_email=sender_email,
24 recipient=recipient,
25 subject=subject,
26 plain=plain,
27 html=html,
28 list_unsubscribe_header=list_unsubscribe_header,
29 source_data=source_data,
30 )
31 queue_job(
32 session,
33 job_type="send_email",
34 payload=payload,
35 priority=5,
36 )
39def queue_email(
40 session, sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header=None, source_data=None
41):
42 """
43 This indirection is so that this can be easily mocked. Not sure how to do it better :(
44 """
45 _queue_email(
46 session=session,
47 sender_name=sender_name,
48 sender_email=sender_email,
49 recipient=recipient,
50 subject=subject,
51 plain=plain,
52 html=html,
53 list_unsubscribe_header=list_unsubscribe_header,
54 source_data=source_data,
55 )
58def enqueue_system_email(session, recipient, template_name, template_args):
59 source, _, _ = loader.get_source(env, f"system/{template_name}.md")
60 _, frontmatter_source, text_source = source.split("---", 2)
62 rendered_frontmatter = env.from_string(frontmatter_source).render(**template_args, plain=True, html=False)
63 frontmatter = yaml.load(rendered_frontmatter, Loader=yaml.FullLoader)
65 plain = env.from_string(text_source.strip()).render(
66 {**template_args, "frontmatter": frontmatter}, plain=True, html=False
67 )
69 queue_email(
70 session,
71 config["NOTIFICATION_EMAIL_SENDER"],
72 config["NOTIFICATION_EMAIL_ADDRESS"],
73 recipient,
74 config["NOTIFICATION_PREFIX"] + frontmatter["subject"],
75 plain,
76 None,
77 source_data=template_name,
78 )
80 emails_counter.inc()