Coverage for src/couchers/servicers/blocking.py: 95%

41 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 17:19 +0000

1import grpc 

2from google.protobuf import empty_pb2 

3from sqlalchemy.sql import union 

4 

5from couchers import errors 

6from couchers.db import session_scope 

7from couchers.models import User, UserBlock 

8from couchers.sql import couchers_select as select 

9from proto import blocking_pb2, blocking_pb2_grpc 

10 

11 

12def are_blocked(session, user1_id, user2_id): 

13 blocked_users = ( 

14 select(UserBlock.blocked_user_id) 

15 .where(UserBlock.blocking_user_id == user1_id) 

16 .where(UserBlock.blocked_user_id == user2_id) 

17 ) 

18 blocking_users = ( 

19 select(UserBlock.blocking_user_id) 

20 .where(UserBlock.blocking_user_id == user2_id) 

21 .where(UserBlock.blocked_user_id == user1_id) 

22 ) 

23 return session.execute(select(union(blocked_users, blocking_users).subquery())).first() is not None 

24 

25 

26class Blocking(blocking_pb2_grpc.BlockingServicer): 

27 def BlockUser(self, request, context): 

28 with session_scope() as session: 

29 blockee = session.execute( 

30 select(User).where(User.is_visible).where(User.username == request.username) 

31 ).scalar_one_or_none() 

32 

33 if not blockee: 

34 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

35 

36 if context.user_id == blockee.id: 

37 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_BLOCK_SELF) 

38 

39 if session.execute( 

40 select(UserBlock) 

41 .where(UserBlock.blocking_user_id == context.user_id) 

42 .where(UserBlock.blocked_user_id == blockee.id) 

43 ).scalar_one_or_none(): 

44 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_ALREADY_BLOCKED) 

45 else: 

46 user_block = UserBlock( 

47 blocking_user_id=context.user_id, 

48 blocked_user_id=blockee.id, 

49 ) 

50 session.add(user_block) 

51 session.commit() 

52 

53 return empty_pb2.Empty() 

54 

55 def UnblockUser(self, request, context): 

56 with session_scope() as session: 

57 blockee = session.execute( 

58 select(User).where(User.is_visible).where(User.username == request.username) 

59 ).scalar_one_or_none() 

60 

61 if not blockee: 

62 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

63 

64 user_block = session.execute( 

65 select(UserBlock) 

66 .where(UserBlock.blocking_user_id == context.user_id) 

67 .where(UserBlock.blocked_user_id == blockee.id) 

68 ).scalar_one_or_none() 

69 if not user_block: 

70 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_BLOCKED) 

71 

72 session.delete(user_block) 

73 session.commit() 

74 

75 return empty_pb2.Empty() 

76 

77 def GetBlockedUsers(self, request, context): 

78 with session_scope() as session: 

79 blocked_users = ( 

80 session.execute( 

81 select(User) 

82 .join(UserBlock, UserBlock.blocked_user_id == User.id) 

83 .where(User.is_visible) 

84 .where(UserBlock.blocking_user_id == context.user_id) 

85 ) 

86 .scalars() 

87 .all() 

88 ) 

89 

90 return blocking_pb2.GetBlockedUsersRes( 

91 blocked_usernames=[blocked_user.username for blocked_user in blocked_users], 

92 )