Coverage for app/backend/src/couchers/email/smtp.py: 88%
91 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import re
2import smtplib
3from collections.abc import Sequence
4from email.headerregistry import Address
5from email.message import EmailMessage, MIMEPart
6from email.utils import make_msgid
7from pathlib import Path
8from typing import cast
10import couchers
11from couchers.config import config
12from couchers.crypto import EMAIL_SOURCE_DATA_KEY_NAME, random_hex, simple_hash_signature
13from couchers.models import Email
14from couchers.proto.internal import jobs_pb2
16template_base = Path(Path(__file__).parent / ".." / ".." / ".." / "templates" / "v2")
18# Base directory for relative EmailPart.data_file_path
19email_related_part_data_path_base = Path(couchers.__file__).parents[3] # /app/backend
22def embed_html_relative_images(
23 html: str, *, base_dir: Path, content_id_domain: str
24) -> tuple[str, list[jobs_pb2.EmailPart]]:
25 """Modifies HTML markup's image references such that they can be embedded in multipart/related MIME parts."""
26 related_parts: list[jobs_pb2.EmailPart] = []
28 def process_relative_src_match(match: re.Match[str]) -> str:
29 """Replaces a src="" attribute with a content id reference."""
30 image_path = base_dir / str(match.group(1))
31 if not image_path.exists(): 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true
32 raise FileExistsError(f"HTML references missing relative image: {image_path}")
34 root_relative_path = image_path.relative_to(email_related_part_data_path_base)
35 mime_type = f"image/{image_path.suffix.removeprefix('.')}"
36 filename = image_path.name
37 bracketed_content_id = make_msgid(domain=content_id_domain)
38 content_id = bracketed_content_id[1:-1]
39 related_parts.append(
40 jobs_pb2.EmailPart(
41 data_file_path=str(root_relative_path),
42 content_type=f'{mime_type}; name="{filename}"',
43 content_disposition=f'inline; filename="{filename}"',
44 content_id=bracketed_content_id,
45 )
46 )
48 return f'src="cid:{content_id}"'
50 html = re.sub(r'src="([^":]+)"', repl=process_relative_src_match, string=html)
51 return html, related_parts
54def email_proto_to_message(payload: jobs_pb2.SendEmailPayload, couchers_id: str) -> tuple[EmailMessage, str | None]:
55 msg = EmailMessage()
56 msg["Subject"] = payload.subject
57 msg["From"] = Address(payload.sender_name, addr_spec=payload.sender_email)
58 msg["To"] = Address(addr_spec=payload.recipient)
59 msg["X-Couchers-ID"] = couchers_id
61 if payload.list_unsubscribe_header: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 msg["List-Unsubscribe"] = payload.list_unsubscribe_header
64 if payload.source_data:
65 msg["X-Couchers-Source-Data"] = payload.source_data
66 msg["X-Couchers-Source-Sig"] = simple_hash_signature(payload.source_data, EMAIL_SOURCE_DATA_KEY_NAME)
68 msg.set_content(payload.plain)
70 html_body: str | None = payload.html
71 if html_body:
72 related_parts: Sequence[jobs_pb2.EmailPart] = payload.html_related_parts
73 if not related_parts: 73 ↛ 77line 73 didn't jump to line 77 because the condition on line 73 was never true
74 # Backcompat (2026-06): Embedding relative images used to be this function's responsibility,
75 # and was moved to the payload builder. Keep this fallback in case we have queued payloads
76 # that didn't do their own embedding.
77 content_id_domain = Address(addr_spec=payload.sender_email).domain
78 html_body, related_parts = embed_html_relative_images(
79 html_body, base_dir=template_base, content_id_domain=content_id_domain
80 )
82 msg.add_alternative(html_body, subtype="html")
83 html_part = cast(list[MIMEPart], msg.get_payload())[-1]
85 for related_part in related_parts:
86 _add_email_part(html_part, related_part, related=True)
88 if payload.attachments:
89 for attachment in payload.attachments:
90 _add_email_part(msg, attachment, related=False)
92 return msg, html_body
95def _add_email_part(msg: MIMEPart, part: jobs_pb2.EmailPart, *, related: bool) -> MIMEPart:
96 # The data is either part of the payload or must be loaded from a file
97 data = part.data
98 if not data and part.data_file_path:
99 data_path = Path(part.data_file_path)
100 if not data_path.is_absolute(): 100 ↛ 102line 100 didn't jump to line 102 because the condition on line 100 was always true
101 data_path = email_related_part_data_path_base / data_path
102 data = data_path.read_bytes()
104 # Create with generic Content-Type/Content-Disposition headers,
105 # then overwrite them with the headers specified by the caller.
106 if related:
107 msg.add_related(data, maintype="application", subtype="octet-stream", disposition="inline")
108 else:
109 msg.add_attachment(data, maintype="application", subtype="octet-stream", disposition="attachment")
111 mime_part = cast(list[MIMEPart], msg.get_payload())[-1]
112 _replace_header_verbatim(mime_part, "Content-Type", part.content_type)
113 if part.content_disposition: 113 ↛ 115line 113 didn't jump to line 115 because the condition on line 113 was always true
114 _replace_header_verbatim(mime_part, "Content-Disposition", part.content_disposition)
115 if part.content_id:
116 _replace_header_verbatim(mime_part, "Content-ID", part.content_id)
118 return mime_part
121def send_smtp_email(payload: jobs_pb2.SendEmailPayload) -> Email:
122 """
123 Sends out the email through SMTP, settings from config.
125 Returns a models.Email object that can be straight away added to the database.
126 """
127 message_id = random_hex()
128 msg, updated_html = email_proto_to_message(payload, message_id)
130 with smtplib.SMTP(config.SMTP_HOST, config.SMTP_PORT) as server:
131 server.ehlo()
132 if not config.DEV: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 server.starttls()
134 # stmplib docs recommend calling ehlo() before and after starttls()
135 server.ehlo()
136 server.login(config.SMTP_USERNAME, config.SMTP_PASSWORD)
137 server.sendmail(payload.sender_email, payload.recipient, msg.as_string())
139 return Email(
140 id=message_id,
141 sender_name=payload.sender_name,
142 sender_email=payload.sender_email,
143 recipient=payload.recipient,
144 subject=payload.subject,
145 plain=payload.plain,
146 html=updated_html or "",
147 list_unsubscribe_header=payload.list_unsubscribe_header,
148 source_data=payload.source_data,
149 )
152def _replace_header_verbatim(part: MIMEPart, name: str, value: str) -> None:
153 # MIMEPart.replace_header will parse the value and reformat it,
154 # resulting in additional quoting for an .ics "method=PUBLISH" parameter,
155 # which are not as backwards compatible with older email clients.
157 if hasattr(part, "_headers"): 157 ↛ 166line 157 didn't jump to line 166 because the condition on line 157 was always true
158 # Replace the header in the internal data structure to avoid reformatting.
159 header_index = next((i for i, val in enumerate(part._headers) if val[0] == name), None)
160 if isinstance(header_index, int):
161 part._headers[header_index] = (name, value)
162 else:
163 part._headers.append((name, value))
164 else:
165 # Non-verbatim fallback, in case the internals change
166 part.replace_header(name, value)