Coverage for src/couchers/servicers/blocking.py: 95%
37 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-04 02:51 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-04 02:51 +0000
1import grpc
2from google.protobuf import empty_pb2
3from sqlalchemy.sql import union
5from couchers import errors
6from couchers.models import User, UserBlock
7from couchers.sql import couchers_select as select
8from proto import blocking_pb2, blocking_pb2_grpc
11def are_blocked(session, user1_id, user2_id):
12 blocked_users = (
13 select(UserBlock.blocked_user_id)
14 .where(UserBlock.blocking_user_id == user1_id)
15 .where(UserBlock.blocked_user_id == user2_id)
16 )
17 blocking_users = (
18 select(UserBlock.blocking_user_id)
19 .where(UserBlock.blocking_user_id == user2_id)
20 .where(UserBlock.blocked_user_id == user1_id)
21 )
22 return session.execute(select(union(blocked_users, blocking_users).subquery())).first() is not None
25class Blocking(blocking_pb2_grpc.BlockingServicer):
26 def BlockUser(self, request, context, session):
27 blockee = session.execute(
28 select(User).where(User.is_visible).where(User.username == request.username)
29 ).scalar_one_or_none()
31 if not blockee:
32 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
34 if context.user_id == blockee.id:
35 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_BLOCK_SELF)
37 if session.execute(
38 select(UserBlock)
39 .where(UserBlock.blocking_user_id == context.user_id)
40 .where(UserBlock.blocked_user_id == blockee.id)
41 ).scalar_one_or_none():
42 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_ALREADY_BLOCKED)
43 else:
44 user_block = UserBlock(
45 blocking_user_id=context.user_id,
46 blocked_user_id=blockee.id,
47 )
48 session.add(user_block)
49 session.commit()
51 return empty_pb2.Empty()
53 def UnblockUser(self, request, context, session):
54 blockee = session.execute(
55 select(User).where(User.is_visible).where(User.username == request.username)
56 ).scalar_one_or_none()
58 if not blockee:
59 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
61 user_block = session.execute(
62 select(UserBlock)
63 .where(UserBlock.blocking_user_id == context.user_id)
64 .where(UserBlock.blocked_user_id == blockee.id)
65 ).scalar_one_or_none()
66 if not user_block:
67 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_BLOCKED)
69 session.delete(user_block)
70 session.commit()
72 return empty_pb2.Empty()
74 def GetBlockedUsers(self, request, context, session):
75 blocked_users = (
76 session.execute(
77 select(User)
78 .join(UserBlock, UserBlock.blocked_user_id == User.id)
79 .where(User.is_visible)
80 .where(UserBlock.blocking_user_id == context.user_id)
81 )
82 .scalars()
83 .all()
84 )
86 return blocking_pb2.GetBlockedUsersRes(
87 blocked_usernames=[blocked_user.username for blocked_user in blocked_users],
88 )