-
Notifications
You must be signed in to change notification settings - Fork 91
Expand file tree
/
Copy pathinline.py
More file actions
214 lines (186 loc) · 7.6 KB
/
inline.py
File metadata and controls
214 lines (186 loc) · 7.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""Execute a notebook inline."""
from __future__ import annotations
import asyncio
from datetime import datetime
import re
import shutil
from tempfile import mkdtemp
import time
import traceback
from typing import TYPE_CHECKING
from nbclient.client import (
CellControlSignal,
CellExecutionError,
CellTimeoutError,
DeadKernelError,
NotebookClient,
ensure_async,
run_sync,
)
import nbformat
from nbformat import NotebookNode
from myst_nb.ext.glue import extract_glue_data_cell
from .base import EvalNameError, ExecutionError, NotebookClientBase
if TYPE_CHECKING:
from markdown_it.tree import SyntaxTreeNode
class NotebookClientInline(NotebookClientBase):
"""A notebook client that executes the notebook inline,
i.e. during the render.
This allows for the client to be called in-between code cell executions,
in order to extract variable state.
"""
def start_client(self):
self._tmp_path = None
if self.nb_config.execution_in_temp:
self._tmp_path = mkdtemp()
resources = {"metadata": {"path": self._tmp_path}}
else:
if self.path is None:
raise ValueError(
"Input source must exist as file, if execution_in_temp=False"
)
resources = {"metadata": {"path": str(self.path.parent)}}
self.logger.info("Starting inline execution client")
self._time_start = time.perf_counter()
self._client = ModifiedNotebookClient(
self.notebook,
record_timing=False,
resources=resources,
allow_errors=self.nb_config.execution_allow_errors,
timeout=self.nb_config.execution_timeout,
)
self._client.reset_execution_trackers()
if self._client.km is None:
self._client.km = self._client.create_kernel_manager()
if not self._client.km.has_kernel:
self._client.start_new_kernel()
self._client.start_new_kernel_client()
# retrieve the the language_info from the kernel
assert self._client.kc is not None
msg_id = self._client.kc.kernel_info()
info_msg = self._client.wait_for_reply(msg_id)
if info_msg is not None and "language_info" in info_msg["content"]:
self.notebook.metadata["language_info"] = info_msg["content"][
"language_info"
]
else:
self.logger.warning("Failed to retrieve language info from kernel")
self._last_cell_executed: int = -1
self._cell_error: None | Exception = None
self._exc_string: None | str = None
def finalise_client(self):
try:
self._client.set_widgets_metadata()
except Exception as exc:
self.logger.warning(f"Failed to set widgets metadata: {exc}")
def close_client(self, exc_type, exc_val, exc_tb):
self.logger.info("Stopping inline execution client")
if self._client.owns_km:
self._client._cleanup_kernel()
del self._client
_exec_time = time.perf_counter() - self._time_start
self.exec_metadata = {
"mtime": datetime.now().timestamp(),
"runtime": _exec_time,
"method": self.nb_config.execution_mode,
"succeeded": False if self._cell_error else True,
"error": f"{self._cell_error.__class__.__name__}"
if self._cell_error
else None,
"traceback": self._exc_string,
}
if not self._cell_error:
self.logger.info(f"Executed notebook in {_exec_time:.2f} seconds")
else:
msg = f"Executing notebook failed: {self._cell_error.__class__.__name__}"
if self.nb_config.execution_show_tb:
msg += f"\n{self._exc_string}"
self.logger.warning(msg, subtype="exec")
if self._tmp_path:
shutil.rmtree(self._tmp_path, ignore_errors=True)
def code_cell_outputs(
self, cell_index: int
) -> tuple[int | None, list[NotebookNode]]:
cells = self.notebook.get("cells", [])
# ensure all cells up to and including the requested cell have been executed
while (not self._cell_error) and cell_index > self._last_cell_executed:
self._last_cell_executed += 1
try:
next_cell = cells[self._last_cell_executed]
except IndexError:
break
try:
self._client.execute_cell(
next_cell,
self._last_cell_executed,
execution_count=self._client.code_cells_executed + 1,
)
except (CellExecutionError, CellTimeoutError) as err:
if self.nb_config.execution_raise_on_error:
raise ExecutionError(str(self.path)) from err
self._cell_error = err
self._exc_string = "".join(traceback.format_exc())
for key, cell_data in extract_glue_data_cell(next_cell):
if key in self._glue_data:
self.logger.warning(
f"glue key {key!r} duplicate",
subtype="glue",
line=self.cell_line(self._last_cell_executed),
)
self._glue_data[key] = cell_data
cell = cells[cell_index]
return cell.get("execution_count", None), cell.get("outputs", [])
def eval_variable(
self, name: str, current_cell: SyntaxTreeNode
) -> list[NotebookNode]:
if not re.match(self.nb_config.eval_name_regex, name):
raise EvalNameError(name)
return self._client.eval_expression(name)
class ModifiedNotebookClient(NotebookClient):
async def async_eval_expression(self, name: str) -> list[NotebookNode]:
"""Evaluate an expression in the kernel.
This is a modified version of `async_execute_cell`,
which executed a single cell, with `name` as the source and returns the result.
"""
assert self.kc is not None
self.log.debug(f"Evaluating expression: {name}")
parent_msg_id = await ensure_async(
self.kc.execute(
str(name),
store_history=False,
stop_on_error=False,
)
)
cell = nbformat.v4.new_code_cell(source=str(name))
exec_timeout = self._get_timeout(cell)
cell_index = -1
self.clear_before_next_output = False
task_poll_kernel_alive = asyncio.ensure_future(self._async_poll_kernel_alive())
task_poll_output_msg = asyncio.ensure_future(
self._async_poll_output_msg(parent_msg_id, cell, cell_index)
)
self.task_poll_for_reply = asyncio.ensure_future(
self._async_poll_for_reply(
parent_msg_id,
cell,
exec_timeout,
task_poll_output_msg,
task_poll_kernel_alive,
)
)
try:
await self.task_poll_for_reply
except asyncio.CancelledError:
# can only be cancelled by task_poll_kernel_alive when the kernel is dead
task_poll_output_msg.cancel()
raise DeadKernelError("Kernel died")
except Exception as e:
# Best effort to cancel request if it hasn't been resolved
try:
# Check if the task_poll_output is doing the raising for us
if not isinstance(e, CellControlSignal):
task_poll_output_msg.cancel()
finally:
raise
return cell.outputs
eval_expression = run_sync(async_eval_expression)