Coverage for src/tests/test_blocking.py: 100%

74 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1import grpc 

2import pytest 

3from google.protobuf import empty_pb2 

4 

5from couchers import errors 

6from couchers.models import UserBlock 

7from couchers.sql import couchers_select as select 

8from proto import blocking_pb2 

9from tests.test_fixtures import blocking_session, db, generate_user, make_user_block, session_scope, testconfig # noqa 

10 

11 

12@pytest.fixture(autouse=True) 

13def _(testconfig): 

14 pass 

15 

16 

17def test_BlockUser(db): 

18 user1, token1 = generate_user() 

19 user2, token2 = generate_user() 

20 

21 with session_scope() as session: 

22 blocked_user_list = ( 

23 session.execute(select(UserBlock).where(UserBlock.blocking_user_id == user1.id)).scalars().all() 

24 ) 

25 assert len(blocked_user_list) == 0 

26 

27 with blocking_session(token1) as user_blocks: 

28 with pytest.raises(grpc.RpcError) as e: 

29 user_blocks.BlockUser(blocking_pb2.BlockUserReq(username=user1.username)) 

30 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

31 assert e.value.details() == errors.CANT_BLOCK_SELF 

32 

33 user_blocks.BlockUser(blocking_pb2.BlockUserReq(username=user2.username)) 

34 

35 with pytest.raises(grpc.RpcError) as e: 

36 user_blocks.BlockUser(blocking_pb2.BlockUserReq(username=user2.username)) 

37 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

38 assert e.value.details() == errors.USER_ALREADY_BLOCKED 

39 

40 with session_scope() as session: 

41 blocked_user_list = ( 

42 session.execute(select(UserBlock).where(UserBlock.blocking_user_id == user1.id)).scalars().all() 

43 ) 

44 assert len(blocked_user_list) == 1 

45 

46 

47def test_make_user_block(db): 

48 user1, token1 = generate_user() 

49 user2, token2 = generate_user() 

50 

51 make_user_block(user1, user2) 

52 

53 with session_scope() as session: 

54 blocked_user_list = ( 

55 session.execute(select(UserBlock).where(UserBlock.blocking_user_id == user1.id)).scalars().all() 

56 ) 

57 assert len(blocked_user_list) == 1 

58 

59 

60def test_UnblockUser(db): 

61 user1, token1 = generate_user() 

62 user2, token2 = generate_user() 

63 make_user_block(user1, user2) 

64 

65 with blocking_session(token1) as user_blocks: 

66 user_blocks.UnblockUser(blocking_pb2.UnblockUserReq(username=user2.username)) 

67 

68 with session_scope() as session: 

69 blocked_users = session.execute(select(UserBlock).where(UserBlock.blocking_user_id == user1.id)).scalars().all() 

70 assert len(blocked_users) == 0 

71 

72 with blocking_session(token1) as user_blocks: 

73 with pytest.raises(grpc.RpcError) as e: 

74 user_blocks.UnblockUser(blocking_pb2.UnblockUserReq(username=user2.username)) 

75 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

76 assert e.value.details() == errors.USER_NOT_BLOCKED 

77 

78 # Test re-blocking 

79 user_blocks.BlockUser(blocking_pb2.BlockUserReq(username=user2.username)) 

80 

81 with session_scope() as session: 

82 blocked_users = session.execute(select(UserBlock).where(UserBlock.blocking_user_id == user1.id)).scalars().all() 

83 assert len(blocked_users) == 1 

84 

85 

86def test_GetBlockedUsers(db): 

87 user1, token1 = generate_user() 

88 user2, token2 = generate_user() 

89 user3, token3 = generate_user() 

90 

91 with blocking_session(token1) as user_blocks: 

92 # Check no blocked users to start 

93 blocked_user_list = user_blocks.GetBlockedUsers(empty_pb2.Empty()) 

94 assert len(blocked_user_list.blocked_usernames) == 0 

95 

96 make_user_block(user1, user2) 

97 make_user_block(user1, user3) 

98 blocked_user_list = user_blocks.GetBlockedUsers(empty_pb2.Empty()) 

99 assert len(blocked_user_list.blocked_usernames) == 2 

100 

101 

102def test_relationships_userblock_dot_user(db): 

103 user1, token1 = generate_user() 

104 user2, token2 = generate_user() 

105 

106 make_user_block(user1, user2) 

107 

108 with session_scope() as session: 

109 block = session.execute( 

110 select(UserBlock).where((UserBlock.blocking_user_id == user1.id) & (UserBlock.blocked_user_id == user2.id)) 

111 ).scalar_one_or_none() 

112 assert block.blocking_user.username == user1.username 

113 assert block.blocked_user.username == user2.username