Coverage for src/couchers/email/__init__.py: 100%

22 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1import logging 

2from pathlib import Path 

3 

4import yaml 

5from jinja2 import Environment, FileSystemLoader 

6 

7from couchers.config import config 

8from couchers.jobs.enqueue import queue_job 

9from proto.internal import jobs_pb2 

10 

11logger = logging.getLogger(__name__) 

12 

13loader = FileSystemLoader(Path(__file__).parent / ".." / ".." / ".." / "templates") 

14env = Environment(loader=loader, trim_blocks=True) 

15 

16 

17def _queue_email( 

18 session, sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data 

19): 

20 payload = jobs_pb2.SendEmailPayload( 

21 sender_name=sender_name, 

22 sender_email=sender_email, 

23 recipient=recipient, 

24 subject=subject, 

25 plain=plain, 

26 html=html, 

27 list_unsubscribe_header=list_unsubscribe_header, 

28 source_data=source_data, 

29 ) 

30 queue_job( 

31 session, 

32 job_type="send_email", 

33 payload=payload, 

34 ) 

35 

36 

37def queue_email( 

38 session, sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header=None, source_data=None 

39): 

40 """ 

41 This indirection is so that this can be easily mocked. Not sure how to do it better :( 

42 """ 

43 _queue_email( 

44 session=session, 

45 sender_name=sender_name, 

46 sender_email=sender_email, 

47 recipient=recipient, 

48 subject=subject, 

49 plain=plain, 

50 html=html, 

51 list_unsubscribe_header=list_unsubscribe_header, 

52 source_data=source_data, 

53 ) 

54 

55 

56def enqueue_system_email(session, recipient, template_name, template_args): 

57 source, _, _ = loader.get_source(env, f"system/{template_name}.md") 

58 _, frontmatter_source, text_source = source.split("---", 2) 

59 

60 rendered_frontmatter = env.from_string(frontmatter_source).render(**template_args, plain=True, html=False) 

61 frontmatter = yaml.load(rendered_frontmatter, Loader=yaml.FullLoader) 

62 

63 plain = env.from_string(text_source.strip()).render( 

64 {**template_args, "frontmatter": frontmatter}, plain=True, html=False 

65 ) 

66 

67 queue_email( 

68 session, 

69 config["NOTIFICATION_EMAIL_SENDER"], 

70 config["NOTIFICATION_EMAIL_ADDRESS"], 

71 recipient, 

72 config["NOTIFICATION_PREFIX"] + frontmatter["subject"], 

73 plain, 

74 None, 

75 source_data=template_name, 

76 )