Coverage for app / backend / src / couchers / servicers / blocking.py: 93%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import grpc 

2from google.protobuf import empty_pb2 

3from sqlalchemy import exists, select 

4from sqlalchemy.orm import Session 

5from sqlalchemy.sql import not_, or_, union 

6 

7from couchers import urls 

8from couchers.context import CouchersContext 

9from couchers.models import Upload, User, UserBlock 

10from couchers.models.uploads import get_avatar_photo_subquery 

11from couchers.proto import blocking_pb2, blocking_pb2_grpc 

12 

13 

14def is_not_visible(session: Session, user1_id: int | None, user2_id: int | None) -> bool: 

15 """ 

16 Check if users are not visible to each other (due to block or because either account is deleted/banned). 

17 """ 

18 hidden_users = select(User.id).where(or_(User.id == user1_id, User.id == user2_id)).where(not_(User.is_visible)) 

19 # if either user_id is empty, just check if either user is hidden (as they can't block each other) 

20 if not user1_id or not user2_id: 

21 return session.execute(select(union(hidden_users).subquery()).limit(1)).one_or_none() is not None 

22 else: 

23 blocked_users = ( 

24 select(UserBlock.blocked_user_id) 

25 .where(UserBlock.blocking_user_id == user1_id) 

26 .where(UserBlock.blocked_user_id == user2_id) 

27 ) 

28 blocking_users = ( 

29 select(UserBlock.blocking_user_id) 

30 .where(UserBlock.blocking_user_id == user2_id) 

31 .where(UserBlock.blocked_user_id == user1_id) 

32 ) 

33 return ( 

34 session.execute( 

35 select(union(blocked_users, blocking_users, hidden_users).subquery()).limit(1) 

36 ).one_or_none() 

37 is not None 

38 ) 

39 

40 

41class Blocking(blocking_pb2_grpc.BlockingServicer): 

42 def BlockUser( 

43 self, request: blocking_pb2.BlockUserReq, context: CouchersContext, session: Session 

44 ) -> empty_pb2.Empty: 

45 blockee = session.execute( 

46 select(User).where(User.is_visible).where(User.username == request.username) 

47 ).scalar_one_or_none() 

48 

49 if not blockee: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true

50 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

51 

52 if context.user_id == blockee.id: 

53 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_block_self") 

54 

55 if session.execute( 

56 select( 

57 exists() 

58 .where(UserBlock.blocking_user_id == context.user_id) 

59 .where(UserBlock.blocked_user_id == blockee.id) 

60 ) 

61 ).scalar_one(): 

62 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_already_blocked") 

63 else: 

64 user_block = UserBlock( 

65 blocking_user_id=context.user_id, 

66 blocked_user_id=blockee.id, 

67 ) 

68 session.add(user_block) 

69 session.commit() 

70 

71 return empty_pb2.Empty() 

72 

73 def UnblockUser( 

74 self, request: blocking_pb2.UnblockUserReq, context: CouchersContext, session: Session 

75 ) -> empty_pb2.Empty: 

76 blockee = session.execute( 

77 select(User).where(User.is_visible).where(User.username == request.username) 

78 ).scalar_one_or_none() 

79 

80 if not blockee: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

82 

83 user_block = session.execute( 

84 select(UserBlock) 

85 .where(UserBlock.blocking_user_id == context.user_id) 

86 .where(UserBlock.blocked_user_id == blockee.id) 

87 ).scalar_one_or_none() 

88 if not user_block: 

89 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_blocked") 

90 

91 session.delete(user_block) 

92 session.commit() 

93 

94 return empty_pb2.Empty() 

95 

96 def GetBlockedUsers( 

97 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

98 ) -> blocking_pb2.GetBlockedUsersRes: 

99 avatar_photo_subquery = get_avatar_photo_subquery() 

100 

101 blocked_users = session.execute( 

102 select(User.username, User.name, Upload.filename) 

103 .join(UserBlock, UserBlock.blocked_user_id == User.id) 

104 .outerjoin( 

105 avatar_photo_subquery, 

106 avatar_photo_subquery.c.gallery_id == User.profile_gallery_id, 

107 ) 

108 .outerjoin(Upload, Upload.key == avatar_photo_subquery.c.upload_key) 

109 .where(User.is_visible) 

110 .where(UserBlock.blocking_user_id == context.user_id) 

111 ).all() 

112 

113 return blocking_pb2.GetBlockedUsersRes( 

114 blocked_users=[ 

115 blocking_pb2.BlockedUser( 

116 username=blocked_user.username, 

117 name=blocked_user.name, 

118 avatar_thumbnail_url=urls.media_url(filename=blocked_user.filename, size="thumbnail") 

119 if blocked_user.filename 

120 else None, 

121 ) 

122 for blocked_user in blocked_users 

123 ] 

124 )