Coverage for src/couchers/servicers/blocking.py: 95%

37 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-11-21 04:21 +0000

1import grpc 

2from google.protobuf import empty_pb2 

3from sqlalchemy.sql import union 

4 

5from couchers import errors 

6from couchers.models import User, UserBlock 

7from couchers.sql import couchers_select as select 

8from proto import blocking_pb2, blocking_pb2_grpc 

9 

10 

11def are_blocked(session, user1_id, user2_id): 

12 blocked_users = ( 

13 select(UserBlock.blocked_user_id) 

14 .where(UserBlock.blocking_user_id == user1_id) 

15 .where(UserBlock.blocked_user_id == user2_id) 

16 ) 

17 blocking_users = ( 

18 select(UserBlock.blocking_user_id) 

19 .where(UserBlock.blocking_user_id == user2_id) 

20 .where(UserBlock.blocked_user_id == user1_id) 

21 ) 

22 return session.execute(select(union(blocked_users, blocking_users).subquery())).first() is not None 

23 

24 

25class Blocking(blocking_pb2_grpc.BlockingServicer): 

26 def BlockUser(self, request, context, session): 

27 blockee = session.execute( 

28 select(User).where(User.is_visible).where(User.username == request.username) 

29 ).scalar_one_or_none() 

30 

31 if not blockee: 

32 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

33 

34 if context.user_id == blockee.id: 

35 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_BLOCK_SELF) 

36 

37 if session.execute( 

38 select(UserBlock) 

39 .where(UserBlock.blocking_user_id == context.user_id) 

40 .where(UserBlock.blocked_user_id == blockee.id) 

41 ).scalar_one_or_none(): 

42 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_ALREADY_BLOCKED) 

43 else: 

44 user_block = UserBlock( 

45 blocking_user_id=context.user_id, 

46 blocked_user_id=blockee.id, 

47 ) 

48 session.add(user_block) 

49 session.commit() 

50 

51 return empty_pb2.Empty() 

52 

53 def UnblockUser(self, request, context, session): 

54 blockee = session.execute( 

55 select(User).where(User.is_visible).where(User.username == request.username) 

56 ).scalar_one_or_none() 

57 

58 if not blockee: 

59 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

60 

61 user_block = session.execute( 

62 select(UserBlock) 

63 .where(UserBlock.blocking_user_id == context.user_id) 

64 .where(UserBlock.blocked_user_id == blockee.id) 

65 ).scalar_one_or_none() 

66 if not user_block: 

67 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_BLOCKED) 

68 

69 session.delete(user_block) 

70 session.commit() 

71 

72 return empty_pb2.Empty() 

73 

74 def GetBlockedUsers(self, request, context, session): 

75 blocked_users = ( 

76 session.execute( 

77 select(User) 

78 .join(UserBlock, UserBlock.blocked_user_id == User.id) 

79 .where(User.is_visible) 

80 .where(UserBlock.blocking_user_id == context.user_id) 

81 ) 

82 .scalars() 

83 .all() 

84 ) 

85 

86 return blocking_pb2.GetBlockedUsersRes( 

87 blocked_usernames=[blocked_user.username for blocked_user in blocked_users], 

88 )