Coverage for src/couchers/servicers/blocking.py: 95%

37 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1import grpc 

2from google.protobuf import empty_pb2 

3from sqlalchemy.sql import union 

4 

5from couchers import errors, urls 

6from couchers.models import Upload, User, UserBlock 

7from couchers.sql import couchers_select as select 

8from proto import blocking_pb2, blocking_pb2_grpc 

9 

10 

11def are_blocked(session, user1_id, user2_id): 

12 blocked_users = ( 

13 select(UserBlock.blocked_user_id) 

14 .where(UserBlock.blocking_user_id == user1_id) 

15 .where(UserBlock.blocked_user_id == user2_id) 

16 ) 

17 blocking_users = ( 

18 select(UserBlock.blocking_user_id) 

19 .where(UserBlock.blocking_user_id == user2_id) 

20 .where(UserBlock.blocked_user_id == user1_id) 

21 ) 

22 return session.execute(select(union(blocked_users, blocking_users).subquery()).limit(1)).one_or_none() is not None 

23 

24 

25class Blocking(blocking_pb2_grpc.BlockingServicer): 

26 def BlockUser(self, request, context, session): 

27 blockee = session.execute( 

28 select(User).where(User.is_visible).where(User.username == request.username) 

29 ).scalar_one_or_none() 

30 

31 if not blockee: 

32 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

33 

34 if context.user_id == blockee.id: 

35 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_BLOCK_SELF) 

36 

37 if session.execute( 

38 select(UserBlock) 

39 .where(UserBlock.blocking_user_id == context.user_id) 

40 .where(UserBlock.blocked_user_id == blockee.id) 

41 ).scalar_one_or_none(): 

42 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_ALREADY_BLOCKED) 

43 else: 

44 user_block = UserBlock( 

45 blocking_user_id=context.user_id, 

46 blocked_user_id=blockee.id, 

47 ) 

48 session.add(user_block) 

49 session.commit() 

50 

51 return empty_pb2.Empty() 

52 

53 def UnblockUser(self, request, context, session): 

54 blockee = session.execute( 

55 select(User).where(User.is_visible).where(User.username == request.username) 

56 ).scalar_one_or_none() 

57 

58 if not blockee: 

59 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

60 

61 user_block = session.execute( 

62 select(UserBlock) 

63 .where(UserBlock.blocking_user_id == context.user_id) 

64 .where(UserBlock.blocked_user_id == blockee.id) 

65 ).scalar_one_or_none() 

66 if not user_block: 

67 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_BLOCKED) 

68 

69 session.delete(user_block) 

70 session.commit() 

71 

72 return empty_pb2.Empty() 

73 

74 def GetBlockedUsers(self, request, context, session): 

75 blocked_users = session.execute( 

76 select(User.username, User.name, Upload.filename) 

77 .join(UserBlock, UserBlock.blocked_user_id == User.id) 

78 .outerjoin(Upload, Upload.key == User.avatar_key) 

79 .where(User.is_visible) 

80 .where(UserBlock.blocking_user_id == context.user_id) 

81 ).all() 

82 

83 return blocking_pb2.GetBlockedUsersRes( 

84 blocked_users=[ 

85 blocking_pb2.BlockedUser( 

86 username=blocked_user.username, 

87 name=blocked_user.name, 

88 avatar_thumbnail_url=urls.media_url(filename=blocked_user.filename, size="thumbnail") 

89 if blocked_user.filename 

90 else None, 

91 ) 

92 for blocked_user in blocked_users 

93 ] 

94 )