Coverage for src / couchers / servicers / blocking.py: 93%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-05 01:33 +0000

1import grpc 

2from google.protobuf import empty_pb2 

3from sqlalchemy import exists, select 

4from sqlalchemy.orm import Session 

5from sqlalchemy.sql import not_, or_, union 

6 

7from couchers import urls 

8from couchers.context import CouchersContext 

9from couchers.models import Upload, User, UserBlock 

10from couchers.proto import blocking_pb2, blocking_pb2_grpc 

11 

12 

13def is_not_visible(session: Session, user1_id: int | None, user2_id: int | None) -> bool: 

14 """ 

15 Check if users are not visible to each other (due to block or because either account is deleted/banned). 

16 """ 

17 hidden_users = select(User.id).where(or_(User.id == user1_id, User.id == user2_id)).where(not_(User.is_visible)) 

18 # if either user_id is empty, just check if either user is hidden (as they can't block each other) 

19 if not user1_id or not user2_id: 

20 return session.execute(select(union(hidden_users).subquery()).limit(1)).one_or_none() is not None 

21 else: 

22 blocked_users = ( 

23 select(UserBlock.blocked_user_id) 

24 .where(UserBlock.blocking_user_id == user1_id) 

25 .where(UserBlock.blocked_user_id == user2_id) 

26 ) 

27 blocking_users = ( 

28 select(UserBlock.blocking_user_id) 

29 .where(UserBlock.blocking_user_id == user2_id) 

30 .where(UserBlock.blocked_user_id == user1_id) 

31 ) 

32 return ( 

33 session.execute( 

34 select(union(blocked_users, blocking_users, hidden_users).subquery()).limit(1) 

35 ).one_or_none() 

36 is not None 

37 ) 

38 

39 

40class Blocking(blocking_pb2_grpc.BlockingServicer): 

41 def BlockUser( 

42 self, request: blocking_pb2.BlockUserReq, context: CouchersContext, session: Session 

43 ) -> empty_pb2.Empty: 

44 blockee = session.execute( 

45 select(User).where(User.is_visible).where(User.username == request.username) 

46 ).scalar_one_or_none() 

47 

48 if not blockee: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

50 

51 if context.user_id == blockee.id: 

52 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_block_self") 

53 

54 if session.execute( 

55 select( 

56 exists() 

57 .where(UserBlock.blocking_user_id == context.user_id) 

58 .where(UserBlock.blocked_user_id == blockee.id) 

59 ) 

60 ).scalar_one(): 

61 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_already_blocked") 

62 else: 

63 user_block = UserBlock( 

64 blocking_user_id=context.user_id, 

65 blocked_user_id=blockee.id, 

66 ) 

67 session.add(user_block) 

68 session.commit() 

69 

70 return empty_pb2.Empty() 

71 

72 def UnblockUser( 

73 self, request: blocking_pb2.UnblockUserReq, context: CouchersContext, session: Session 

74 ) -> empty_pb2.Empty: 

75 blockee = session.execute( 

76 select(User).where(User.is_visible).where(User.username == request.username) 

77 ).scalar_one_or_none() 

78 

79 if not blockee: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

81 

82 user_block = session.execute( 

83 select(UserBlock) 

84 .where(UserBlock.blocking_user_id == context.user_id) 

85 .where(UserBlock.blocked_user_id == blockee.id) 

86 ).scalar_one_or_none() 

87 if not user_block: 

88 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_blocked") 

89 

90 session.delete(user_block) 

91 session.commit() 

92 

93 return empty_pb2.Empty() 

94 

95 def GetBlockedUsers( 

96 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

97 ) -> blocking_pb2.GetBlockedUsersRes: 

98 blocked_users = session.execute( 

99 select(User.username, User.name, Upload.filename) 

100 .join(UserBlock, UserBlock.blocked_user_id == User.id) 

101 .outerjoin(Upload, Upload.key == User.avatar_key) 

102 .where(User.is_visible) 

103 .where(UserBlock.blocking_user_id == context.user_id) 

104 ).all() 

105 

106 return blocking_pb2.GetBlockedUsersRes( 

107 blocked_users=[ 

108 blocking_pb2.BlockedUser( 

109 username=blocked_user.username, 

110 name=blocked_user.name, 

111 avatar_thumbnail_url=urls.media_url(filename=blocked_user.filename, size="thumbnail") 

112 if blocked_user.filename 

113 else None, 

114 ) 

115 for blocked_user in blocked_users 

116 ] 

117 )