Coverage for src/couchers/servicers/blocking.py: 95%

38 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-23 04:49 +0000

1import grpc 

2from google.protobuf import empty_pb2 

3from sqlalchemy.sql import not_, or_, union 

4 

5from couchers import errors, urls 

6from couchers.models import Upload, User, UserBlock 

7from couchers.sql import couchers_select as select 

8from proto import blocking_pb2, blocking_pb2_grpc 

9 

10 

11def is_not_visible(session, user1_id, user2_id) -> bool: 

12 """ 

13 Check if users are not visible to each other (due to block or because either account is deleted/banned). 

14 """ 

15 blocked_users = ( 

16 select(UserBlock.blocked_user_id) 

17 .where(UserBlock.blocking_user_id == user1_id) 

18 .where(UserBlock.blocked_user_id == user2_id) 

19 ) 

20 blocking_users = ( 

21 select(UserBlock.blocking_user_id) 

22 .where(UserBlock.blocking_user_id == user2_id) 

23 .where(UserBlock.blocked_user_id == user1_id) 

24 ) 

25 hidden_users = select(User.id).where(or_(User.id == user1_id, User.id == user2_id)).where(not_(User.is_visible)) 

26 return ( 

27 session.execute(select(union(blocked_users, blocking_users, hidden_users).subquery()).limit(1)).one_or_none() 

28 is not None 

29 ) 

30 

31 

32class Blocking(blocking_pb2_grpc.BlockingServicer): 

33 def BlockUser(self, request, context, session): 

34 blockee = session.execute( 

35 select(User).where(User.is_visible).where(User.username == request.username) 

36 ).scalar_one_or_none() 

37 

38 if not blockee: 

39 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

40 

41 if context.user_id == blockee.id: 

42 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_BLOCK_SELF) 

43 

44 if session.execute( 

45 select(UserBlock) 

46 .where(UserBlock.blocking_user_id == context.user_id) 

47 .where(UserBlock.blocked_user_id == blockee.id) 

48 ).scalar_one_or_none(): 

49 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_ALREADY_BLOCKED) 

50 else: 

51 user_block = UserBlock( 

52 blocking_user_id=context.user_id, 

53 blocked_user_id=blockee.id, 

54 ) 

55 session.add(user_block) 

56 session.commit() 

57 

58 return empty_pb2.Empty() 

59 

60 def UnblockUser(self, request, context, session): 

61 blockee = session.execute( 

62 select(User).where(User.is_visible).where(User.username == request.username) 

63 ).scalar_one_or_none() 

64 

65 if not blockee: 

66 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

67 

68 user_block = session.execute( 

69 select(UserBlock) 

70 .where(UserBlock.blocking_user_id == context.user_id) 

71 .where(UserBlock.blocked_user_id == blockee.id) 

72 ).scalar_one_or_none() 

73 if not user_block: 

74 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_BLOCKED) 

75 

76 session.delete(user_block) 

77 session.commit() 

78 

79 return empty_pb2.Empty() 

80 

81 def GetBlockedUsers(self, request, context, session): 

82 blocked_users = session.execute( 

83 select(User.username, User.name, Upload.filename) 

84 .join(UserBlock, UserBlock.blocked_user_id == User.id) 

85 .outerjoin(Upload, Upload.key == User.avatar_key) 

86 .where(User.is_visible) 

87 .where(UserBlock.blocking_user_id == context.user_id) 

88 ).all() 

89 

90 return blocking_pb2.GetBlockedUsersRes( 

91 blocked_users=[ 

92 blocking_pb2.BlockedUser( 

93 username=blocked_user.username, 

94 name=blocked_user.name, 

95 avatar_thumbnail_url=urls.media_url(filename=blocked_user.filename, size="thumbnail") 

96 if blocked_user.filename 

97 else None, 

98 ) 

99 for blocked_user in blocked_users 

100 ] 

101 )