Coverage for src/couchers/email/__init__.py: 100%

22 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-04-16 15:13 +0000

1import logging 

2from pathlib import Path 

3 

4import yaml 

5from jinja2 import Environment, FileSystemLoader 

6 

7from couchers.config import config 

8from couchers.jobs.enqueue import queue_job 

9from proto.internal import jobs_pb2 

10 

11logger = logging.getLogger(__name__) 

12 

13loader = FileSystemLoader(Path(__file__).parent / ".." / ".." / ".." / "templates") 

14env = Environment(loader=loader, trim_blocks=True) 

15 

16 

17def _queue_email( 

18 session, sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data 

19): 

20 payload = jobs_pb2.SendEmailPayload( 

21 sender_name=sender_name, 

22 sender_email=sender_email, 

23 recipient=recipient, 

24 subject=subject, 

25 plain=plain, 

26 html=html, 

27 list_unsubscribe_header=list_unsubscribe_header, 

28 source_data=source_data, 

29 ) 

30 queue_job( 

31 session, 

32 job_type="send_email", 

33 payload=payload, 

34 priority=5, 

35 ) 

36 

37 

38def queue_email( 

39 session, sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header=None, source_data=None 

40): 

41 """ 

42 This indirection is so that this can be easily mocked. Not sure how to do it better :( 

43 """ 

44 _queue_email( 

45 session=session, 

46 sender_name=sender_name, 

47 sender_email=sender_email, 

48 recipient=recipient, 

49 subject=subject, 

50 plain=plain, 

51 html=html, 

52 list_unsubscribe_header=list_unsubscribe_header, 

53 source_data=source_data, 

54 ) 

55 

56 

57def enqueue_system_email(session, recipient, template_name, template_args): 

58 source, _, _ = loader.get_source(env, f"system/{template_name}.md") 

59 _, frontmatter_source, text_source = source.split("---", 2) 

60 

61 rendered_frontmatter = env.from_string(frontmatter_source).render(**template_args, plain=True, html=False) 

62 frontmatter = yaml.load(rendered_frontmatter, Loader=yaml.FullLoader) 

63 

64 plain = env.from_string(text_source.strip()).render( 

65 {**template_args, "frontmatter": frontmatter}, plain=True, html=False 

66 ) 

67 

68 queue_email( 

69 session, 

70 config["NOTIFICATION_EMAIL_SENDER"], 

71 config["NOTIFICATION_EMAIL_ADDRESS"], 

72 recipient, 

73 config["NOTIFICATION_PREFIX"] + frontmatter["subject"], 

74 plain, 

75 None, 

76 source_data=template_name, 

77 )