Skip to content

Commit 6677516

Browse files
committed
Accept matchmaking requests from rabbitmq
1 parent 9de70f9 commit 6677516

7 files changed

Lines changed: 465 additions & 29 deletions

File tree

server/ladder_service.py

Lines changed: 154 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from collections import defaultdict
99
from typing import Dict, List, Optional, Set, Tuple
1010

11+
import aio_pika
1112
import aiocron
1213
from sqlalchemy import and_, func, select, text, true
1314

@@ -30,9 +31,12 @@
3031
matchmaker_queue_map_pool
3132
)
3233
from .decorators import with_logger
34+
from .factions import Faction
3335
from .game_service import GameService
3436
from .games import Game, InitMode, LadderGame
3537
from .matchmaker import MapPool, MatchmakerQueue, OnMatchedCallback, Search
38+
from .message_queue_service import MessageQueueService
39+
from .player_service import PlayerService
3640
from .players import Player, PlayerState
3741
from .protocol import DisconnectedError
3842
from .types import GameLaunchOptions, Map, NeroxisGeneratedMap
@@ -55,17 +59,35 @@ def __init__(
5559
self,
5660
database: FAFDatabase,
5761
game_service: GameService,
62+
player_service: PlayerService,
63+
message_queue_service: MessageQueueService
5864
):
5965
self._db = database
6066
self._informed_players: Set[Player] = set()
6167
self.game_service = game_service
68+
self.player_service = player_service
69+
self.message_queue_service = message_queue_service
6270
self.queues = {}
71+
self._initialized = False
6372

6473
self._searches: Dict[Player, Dict[str, Search]] = defaultdict(dict)
6574

6675
async def initialize(self) -> None:
76+
if self._initialized:
77+
return
78+
6779
await self.update_data()
80+
await self.message_queue_service.declare_exchange(
81+
config.MQ_EXCHANGE_NAME
82+
)
83+
await self.message_queue_service.consume(
84+
config.MQ_EXCHANGE_NAME,
85+
"request.match.create",
86+
self.handle_mq_matchmaking_request
87+
)
88+
6889
self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data)
90+
self._initialized = True
6991

7092
async def update_data(self) -> None:
7193
async with self._db.acquire() as conn:
@@ -325,6 +347,137 @@ def write_rating_progress(self, player: Player, rating_type: str) -> None:
325347
)
326348
})
327349

350+
async def handle_mq_matchmaking_request(
351+
self,
352+
message: aio_pika.IncomingMessage
353+
):
354+
try:
355+
game = await self._handle_mq_matchmaking_request(message)
356+
except Exception as e:
357+
if isinstance(e, GameLaunchError):
358+
code = "launch_failed"
359+
args = [{"player_id": player.id} for player in e.players]
360+
elif isinstance(e, json.JSONDecodeError):
361+
code = "malformed_request"
362+
args = [{"message": str(e)}]
363+
elif isinstance(e, KeyError):
364+
code = "malformed_request"
365+
args = [{"message": f"missing {e.args[0]}"}]
366+
else:
367+
code = e.args[0]
368+
args = e.args[1:]
369+
370+
await self.message_queue_service.publish(
371+
config.MQ_EXCHANGE_NAME,
372+
"error.match.create",
373+
{"error_code": code, "args": args},
374+
correlation_id=message.correlation_id
375+
)
376+
else:
377+
await self.message_queue_service.publish(
378+
config.MQ_EXCHANGE_NAME,
379+
"success.match.create",
380+
{"game_id": game.id},
381+
correlation_id=message.correlation_id
382+
)
383+
384+
async def _handle_mq_matchmaking_request(
385+
self,
386+
message: aio_pika.IncomingMessage
387+
):
388+
self._logger.debug(
389+
"Got matchmaking request: %s", message.correlation_id
390+
)
391+
request = json.loads(message.body)
392+
# TODO: Use id instead of name?
393+
queue_name = request.get("matchmaker_queue")
394+
map_name = request["map_name"]
395+
game_name = request["game_name"]
396+
participants = request["participants"]
397+
if queue_name:
398+
featured_mod = request.get("featured_mod")
399+
else:
400+
featured_mod = request["featured_mod"]
401+
402+
if queue_name and queue_name not in self.queues:
403+
raise Exception("invalid_request", "invalid queue")
404+
405+
if not participants:
406+
raise Exception("invalid_request", "empty participants")
407+
408+
player_ids = [participant["player_id"] for participant in participants]
409+
missing_players = [
410+
id for id in player_ids if self.player_service[id] is None
411+
]
412+
if missing_players:
413+
raise Exception(
414+
"players_not_found",
415+
*[{"player_id": id} for id in missing_players]
416+
)
417+
418+
all_players = [
419+
self.player_service[player_id] for player_id in player_ids
420+
]
421+
non_idle_players = [
422+
player for player in all_players
423+
if player.state != PlayerState.IDLE
424+
]
425+
if non_idle_players:
426+
raise Exception(
427+
"invalid_state",
428+
[
429+
{"player_id": player.id, "state": player.state.name}
430+
for player in all_players
431+
]
432+
)
433+
434+
queue = self.queues[queue_name] if queue_name else None
435+
featured_mod = featured_mod or queue.featured_mod
436+
host = all_players[0]
437+
guests = all_players[1:]
438+
439+
for player in all_players:
440+
player.state = PlayerState.STARTING_AUTOMATCH
441+
442+
try:
443+
game = self.game_service.create_game(
444+
game_class=LadderGame,
445+
game_mode=featured_mod,
446+
host=host,
447+
name="Matchmaker Game",
448+
mapname=map_name,
449+
matchmaker_queue_id=queue.id if queue else None,
450+
rating_type=queue.rating_type if queue else None,
451+
max_players=len(participants)
452+
)
453+
game.init_mode = InitMode.AUTO_LOBBY
454+
game.set_name_unchecked(game_name)
455+
456+
for participant in participants:
457+
player_id = participant["player_id"]
458+
faction = Faction.from_value(participant["faction"])
459+
team = participant["team"]
460+
slot = participant["slot"]
461+
462+
game.set_player_option(player_id, "Faction", faction.value)
463+
game.set_player_option(player_id, "Team", team)
464+
game.set_player_option(player_id, "StartSpot", slot)
465+
game.set_player_option(player_id, "Army", slot)
466+
game.set_player_option(player_id, "Color", slot)
467+
468+
await self.launch_game(game, host, guests)
469+
470+
return game
471+
except Exception:
472+
self._logger.exception("")
473+
await game.on_game_end()
474+
475+
for player in all_players:
476+
if player.state == PlayerState.STARTING_AUTOMATCH:
477+
player.state = PlayerState.IDLE
478+
479+
raise
480+
328481
def on_match_found(
329482
self,
330483
s1: Search,
@@ -465,7 +618,7 @@ async def launch_game(
465618
def game_options(player: Player) -> GameLaunchOptions:
466619
return options._replace(
467620
team=game.get_player_option(player.id, "Team"),
468-
faction=player.faction,
621+
faction=game.get_player_option(player.id, "Faction"),
469622
map_position=game.get_player_option(player.id, "StartSpot")
470623
)
471624

server/message_queue_service.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import asyncio
66
import json
7-
from typing import Dict
7+
from typing import Callable, Dict, Optional
88

99
import aio_pika
1010
from aio_pika import DeliveryMode, ExchangeType
@@ -123,34 +123,68 @@ async def _shutdown(self) -> None:
123123
async def publish(
124124
self,
125125
exchange_name: str,
126-
routing: str,
126+
routing_key: str,
127127
payload: Dict,
128128
mandatory: bool = False,
129129
delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
130+
correlation_id: Optional[str] = None
130131
) -> None:
131132
if not self._is_ready:
132133
self._logger.warning(
133134
"Not connected to RabbitMQ, unable to publish message."
134135
)
135136
return
136137

137-
exchange = self._exchanges.get(exchange_name)
138-
if exchange is None:
139-
raise KeyError(f"Unknown exchange {exchange_name}.")
138+
exchange = self._get_exchange(exchange_name)
140139

141140
message = aio_pika.Message(
142-
json.dumps(payload).encode(), delivery_mode=delivery_mode
141+
json.dumps(payload).encode(),
142+
delivery_mode=delivery_mode,
143+
correlation_id=correlation_id,
143144
)
144145

145146
async with self._channel.transaction():
146147
await exchange.publish(
147148
message,
148-
routing_key=routing,
149-
mandatory=mandatory
149+
routing_key=routing_key,
150+
mandatory=mandatory,
150151
)
151152
self._logger.log(
152-
TRACE, "Published message %s to %s/%s", payload, exchange_name, routing
153+
TRACE, "Published message %s to %s/%s",
154+
payload,
155+
exchange_name,
156+
routing_key
157+
)
158+
159+
async def consume(
160+
self,
161+
exchange_name: str,
162+
routing_key: str,
163+
process_message: Callable[[aio_pika.IncomingMessage], None]
164+
) -> None:
165+
await self.initialize()
166+
if not self._is_ready:
167+
self._logger.warning(
168+
"Not connected to RabbitMQ, unable to declare queue."
153169
)
170+
return
171+
172+
exchange = self._get_exchange(exchange_name)
173+
queue = await self._channel.declare_queue(
174+
None,
175+
auto_delete=True,
176+
durable=False
177+
)
178+
179+
await queue.bind(exchange, routing_key)
180+
await queue.consume(process_message, exclusive=True)
181+
182+
def _get_exchange(self, exchange_name: str) -> aio_pika.Exchange:
183+
exchange = self._exchanges.get(exchange_name)
184+
if exchange is None:
185+
raise KeyError(f"Unknown exchange {exchange_name}.")
186+
187+
return exchange
154188

155189
@synchronizedmethod("initialization_lock")
156190
async def reconnect(self) -> None:

tests/integration_tests/conftest.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,20 @@ def mock_games():
3232

3333

3434
@pytest.fixture
35-
async def ladder_service(mocker, database, game_service):
35+
async def ladder_service(
36+
mocker,
37+
database,
38+
game_service,
39+
player_service,
40+
message_queue_service
41+
):
3642
mocker.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1)
37-
ladder_service = LadderService(database, game_service)
43+
ladder_service = LadderService(
44+
database,
45+
game_service,
46+
player_service,
47+
message_queue_service
48+
)
3849
await ladder_service.initialize()
3950
yield ladder_service
4051
await ladder_service.shutdown()
@@ -303,17 +314,22 @@ async def channel():
303314
await connection.close()
304315

305316

306-
async def connect_mq_consumer(server, channel, routing_key):
307-
"""
308-
Returns a subclass of Protocol that yields messages read from a rabbitmq
309-
exchange.
310-
"""
317+
async def connect_mq_queue(channel, routing_key):
311318
exchange = await channel.declare_exchange(
312319
config.MQ_EXCHANGE_NAME,
313320
aio_pika.ExchangeType.TOPIC
314321
)
315-
queue = await channel.declare_queue("", exclusive=True)
322+
queue = await channel.declare_queue(None, exclusive=True)
316323
await queue.bind(exchange, routing_key=routing_key)
324+
return queue
325+
326+
327+
async def connect_mq_consumer(channel, routing_key):
328+
"""
329+
Returns a subclass of Protocol that yields messages read from a rabbitmq
330+
exchange.
331+
"""
332+
queue = await connect_mq_queue(channel, routing_key)
317333
proto = AioQueueProtocol(queue)
318334
await proto.consume()
319335

0 commit comments

Comments
 (0)