Coverage for src/couchers/servicers/blocking.py: 95%

41 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1import grpc 

2from google.protobuf import empty_pb2 

3from sqlalchemy import exists 

4from sqlalchemy.sql import not_, or_, union 

5 

6from couchers import urls 

7from couchers.models import Upload, User, UserBlock 

8from couchers.proto import blocking_pb2, blocking_pb2_grpc 

9from couchers.sql import couchers_select as select 

10 

11 

12def is_not_visible(session, user1_id: int | None, user2_id: int | None) -> bool: 

13 """ 

14 Check if users are not visible to each other (due to block or because either account is deleted/banned). 

15 """ 

16 hidden_users = select(User.id).where(or_(User.id == user1_id, User.id == user2_id)).where(not_(User.is_visible)) 

17 # if either user_id is empty, just check if either user is hidden (as they can't block each other) 

18 if not user1_id or not user2_id: 

19 return session.execute(select(union(hidden_users).subquery()).limit(1)).one_or_none() is not None 

20 else: 

21 blocked_users = ( 

22 select(UserBlock.blocked_user_id) 

23 .where(UserBlock.blocking_user_id == user1_id) 

24 .where(UserBlock.blocked_user_id == user2_id) 

25 ) 

26 blocking_users = ( 

27 select(UserBlock.blocking_user_id) 

28 .where(UserBlock.blocking_user_id == user2_id) 

29 .where(UserBlock.blocked_user_id == user1_id) 

30 ) 

31 return ( 

32 session.execute( 

33 select(union(blocked_users, blocking_users, hidden_users).subquery()).limit(1) 

34 ).one_or_none() 

35 is not None 

36 ) 

37 

38 

39class Blocking(blocking_pb2_grpc.BlockingServicer): 

40 def BlockUser(self, request, context, session): 

41 blockee = session.execute( 

42 select(User).where(User.is_visible).where(User.username == request.username) 

43 ).scalar_one_or_none() 

44 

45 if not blockee: 

46 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

47 

48 if context.user_id == blockee.id: 

49 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_block_self") 

50 

51 if session.execute( 

52 select( 

53 exists() 

54 .where(UserBlock.blocking_user_id == context.user_id) 

55 .where(UserBlock.blocked_user_id == blockee.id) 

56 ) 

57 ).scalar_one(): 

58 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_already_blocked") 

59 else: 

60 user_block = UserBlock( 

61 blocking_user_id=context.user_id, 

62 blocked_user_id=blockee.id, 

63 ) 

64 session.add(user_block) 

65 session.commit() 

66 

67 return empty_pb2.Empty() 

68 

69 def UnblockUser(self, request, context, session): 

70 blockee = session.execute( 

71 select(User).where(User.is_visible).where(User.username == request.username) 

72 ).scalar_one_or_none() 

73 

74 if not blockee: 

75 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

76 

77 user_block = session.execute( 

78 select(UserBlock) 

79 .where(UserBlock.blocking_user_id == context.user_id) 

80 .where(UserBlock.blocked_user_id == blockee.id) 

81 ).scalar_one_or_none() 

82 if not user_block: 

83 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_blocked") 

84 

85 session.delete(user_block) 

86 session.commit() 

87 

88 return empty_pb2.Empty() 

89 

90 def GetBlockedUsers(self, request, context, session): 

91 blocked_users = session.execute( 

92 select(User.username, User.name, Upload.filename) 

93 .join(UserBlock, UserBlock.blocked_user_id == User.id) 

94 .outerjoin(Upload, Upload.key == User.avatar_key) 

95 .where(User.is_visible) 

96 .where(UserBlock.blocking_user_id == context.user_id) 

97 ).all() 

98 

99 return blocking_pb2.GetBlockedUsersRes( 

100 blocked_users=[ 

101 blocking_pb2.BlockedUser( 

102 username=blocked_user.username, 

103 name=blocked_user.name, 

104 avatar_thumbnail_url=urls.media_url(filename=blocked_user.filename, size="thumbnail") 

105 if blocked_user.filename 

106 else None, 

107 ) 

108 for blocked_user in blocked_users 

109 ] 

110 )