Skip to content

Commit 68ecc0d

Browse files
CSWYF3634076kesmeey
authored andcommitted
[BugFix]fix console log metrics waitting queue count (PaddlePaddle#6432)
* [BugFix]fix console log metrics waitting queue count * [BugFix]fix console log metrics waitting queue count unittest
1 parent 3a71669 commit 68ecc0d

3 files changed

Lines changed: 945 additions & 52 deletions

File tree

fastdeploy/engine/common_engine.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,8 @@ def _fetch_request():
996996
else:
997997
raise
998998

999+
if hasattr(self.resource_manager, "scheduler_unhandled_request_num"):
1000+
self.resource_manager.scheduler_unhandled_request_num = self._get_scheduler_unhandled_request_num()
9991001
# 2. Schedule requests
10001002
tasks, error_tasks = self.resource_manager.schedule()
10011003

@@ -1063,6 +1065,20 @@ def _fetch_request():
10631065
err_msg = "Error happend while insert task to engine: {}, {}.".format(e, str(traceback.format_exc()))
10641066
self.llm_logger.error(err_msg)
10651067

1068+
def _get_scheduler_unhandled_request_num(self) -> int:
1069+
"""
1070+
Get scheduler-level pending request count when supported.
1071+
"""
1072+
get_unhandled = getattr(self.scheduler, "get_unhandled_request_num", None)
1073+
if not callable(get_unhandled):
1074+
return 0
1075+
try:
1076+
unhandled = int(get_unhandled())
1077+
except Exception as e:
1078+
self.llm_logger.debug(f"Failed to get scheduler unhandled request num: {e}")
1079+
return 0
1080+
return max(unhandled, 0)
1081+
10661082
def start_zmq_service(self, api_server_pid=None):
10671083
if api_server_pid is None:
10681084
return

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 56 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
214214
self.current_reserve_output_block_num = self.init_reserve_output_block_num
215215
self.current_reserve_output_block_num_float = self.init_reserve_output_block_num
216216
self.can_relax_prefill_strategy = True
217+
# Scheduler-side requests that have not been moved into resource manager waiting queue yet.
218+
self.scheduler_unhandled_request_num = 0
217219

218220
def allocated_slots(self, request: Request):
219221
return len(request.block_tables) * self.config.cache_config.block_size
@@ -957,56 +959,7 @@ def _allocate_decode_and_extend():
957959
if self.current_reserve_output_block_num == 0:
958960
self.can_relax_prefill_strategy = True
959961

960-
if (
961-
hasattr(self, "scheduler_metrics_logger")
962-
and self.scheduler_metrics_logger is not None
963-
and envs.FD_CONSOLE_SCHEDULER_METRICS
964-
):
965-
total_blocks = self.total_block_number()
966-
free_blocks = self.available_block_num()
967-
used_blocks = max(total_blocks - free_blocks, 0)
968-
tokens_used = used_blocks * self.config.cache_config.block_size
969-
token_usage = used_blocks / total_blocks if total_blocks > 0 else 0.0
970-
running_cnt = len(self.running)
971-
queue_cnt = len(self.waiting)
972-
973-
prefill_reqs = [
974-
r for r in scheduled_reqs if isinstance(r, Request) and r.task_type == RequestType.PREFILL
975-
]
976-
has_decode = any(getattr(r, "task_type", None) == RequestType.DECODE for r in scheduled_reqs)
977-
978-
self.scheduler_metrics_logger.log_prefill_batch(
979-
prefill_reqs=prefill_reqs,
980-
running_cnt=running_cnt,
981-
queue_cnt=queue_cnt,
982-
tokens_used=tokens_used,
983-
token_usage=token_usage,
984-
)
985-
if has_decode:
986-
has_prefill = len(prefill_reqs) > 0
987-
graph_opt_cfg = self.config.graph_opt_config
988-
use_cudagraph_cfg = bool(getattr(graph_opt_cfg, "use_cudagraph", False))
989-
graph_opt_level = int(getattr(graph_opt_cfg, "graph_opt_level", 0) or 0)
990-
full_cuda_graph = bool(getattr(graph_opt_cfg, "full_cuda_graph", True))
991-
cudagraph_only_prefill = bool(getattr(graph_opt_cfg, "cudagraph_only_prefill", False))
992-
use_decode_cudagraph = (
993-
has_decode
994-
and use_cudagraph_cfg
995-
and (
996-
# Reference PR https://github.com/PaddlePaddle/FastDeploy/pull/6196
997-
# Static split graph mode: Prefill+Mixed and Decode can use CUDAGraph.
998-
(graph_opt_level > 0 and not full_cuda_graph)
999-
# Dynamic / static-full modes: decode-only can use CUDAGraph.
1000-
or (not has_prefill and not cudagraph_only_prefill)
1001-
)
1002-
)
1003-
self.scheduler_metrics_logger.log_decode_batch(
1004-
running_cnt=running_cnt,
1005-
queue_cnt=queue_cnt,
1006-
tokens_used=tokens_used,
1007-
token_usage=token_usage,
1008-
use_cudagraph=use_decode_cudagraph,
1009-
)
962+
self._log_console_scheduler_metrics(scheduled_reqs)
1010963

1011964
self.update_metrics()
1012965

@@ -1464,3 +1417,56 @@ def log_status(self):
14641417
f"requests={self.requests}, "
14651418
f")"
14661419
)
1420+
1421+
def _log_console_scheduler_metrics(self, scheduled_reqs: list[Request | ScheduledDecodeTask]) -> None:
1422+
if not (
1423+
hasattr(self, "scheduler_metrics_logger")
1424+
and self.scheduler_metrics_logger is not None
1425+
and envs.FD_CONSOLE_SCHEDULER_METRICS
1426+
):
1427+
return
1428+
1429+
total_blocks = self.total_block_number()
1430+
free_blocks = self.available_block_num()
1431+
used_blocks = max(total_blocks - free_blocks, 0)
1432+
tokens_used = used_blocks * self.config.cache_config.block_size
1433+
token_usage = used_blocks / total_blocks if total_blocks > 0 else 0.0
1434+
running_cnt = len(self.running)
1435+
scheduler_queue_cnt = max(int(getattr(self, "scheduler_unhandled_request_num", 0) or 0), 0)
1436+
queue_cnt = len(self.waiting) + scheduler_queue_cnt
1437+
1438+
prefill_reqs = [r for r in scheduled_reqs if isinstance(r, Request) and r.task_type == RequestType.PREFILL]
1439+
has_decode = any(getattr(r, "task_type", None) == RequestType.DECODE for r in scheduled_reqs)
1440+
1441+
self.scheduler_metrics_logger.log_prefill_batch(
1442+
prefill_reqs=prefill_reqs,
1443+
running_cnt=running_cnt,
1444+
queue_cnt=queue_cnt,
1445+
tokens_used=tokens_used,
1446+
token_usage=token_usage,
1447+
)
1448+
if has_decode:
1449+
has_prefill = len(prefill_reqs) > 0
1450+
graph_opt_cfg = self.config.graph_opt_config
1451+
use_cudagraph_cfg = bool(getattr(graph_opt_cfg, "use_cudagraph", False))
1452+
graph_opt_level = int(getattr(graph_opt_cfg, "graph_opt_level", 0) or 0)
1453+
full_cuda_graph = bool(getattr(graph_opt_cfg, "full_cuda_graph", True))
1454+
cudagraph_only_prefill = bool(getattr(graph_opt_cfg, "cudagraph_only_prefill", False))
1455+
use_decode_cudagraph = (
1456+
has_decode
1457+
and use_cudagraph_cfg
1458+
and (
1459+
# Reference PR https://github.com/PaddlePaddle/FastDeploy/pull/6196
1460+
# Static split graph mode: Prefill+Mixed and Decode can use CUDAGraph.
1461+
(graph_opt_level > 0 and not full_cuda_graph)
1462+
# Dynamic / static-full modes: decode-only can use CUDAGraph.
1463+
or (not has_prefill and not cudagraph_only_prefill)
1464+
)
1465+
)
1466+
self.scheduler_metrics_logger.log_decode_batch(
1467+
running_cnt=running_cnt,
1468+
queue_cnt=queue_cnt,
1469+
tokens_used=tokens_used,
1470+
token_usage=token_usage,
1471+
use_cudagraph=use_decode_cudagraph,
1472+
)

0 commit comments

Comments
 (0)