-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy path_background_service.py
More file actions
292 lines (229 loc) · 10.1 KB
/
_background_service.py
File metadata and controls
292 lines (229 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# License: MIT
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
"""Background service implementation."""
import abc
import asyncio
import collections.abc
import logging
from types import TracebackType
from typing import Any, Self
_logger = logging.getLogger(__name__)
class BackgroundService(abc.ABC):
"""A background service that can be started and stopped.
A background service is a service that runs in the background spawning one or more
tasks. The service can be [started][frequenz.sdk.actor.BackgroundService.start]
and [stopped][frequenz.sdk.actor.BackgroundService.stop] and can work as an
async context manager to provide deterministic cleanup.
To implement a background service, subclasses must implement the
[`start()`][frequenz.sdk.actor.BackgroundService.start] method, which should
start the background tasks needed by the service, and add them to the `_tasks`
protected attribute.
If you need to collect results or handle exceptions of the tasks when stopping the
service, then you need to also override the
[`stop()`][frequenz.sdk.actor.BackgroundService.stop] method, as the base
implementation does not collect any results and re-raises all exceptions.
!!! warning
As background services manage [`asyncio.Task`][] objects, a reference to them
must be held for as long as the background service is expected to be running,
otherwise its tasks will be cancelled and the service will stop. For more
information, please refer to the [Python `asyncio`
documentation](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task).
Example: Simple background service example
```python
from typing_extensions import override
import datetime
import asyncio
class Clock(BackgroundService):
def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
super().__init__(name=name)
self._resolution_s = resolution_s
@override
def start(self) -> None:
self._tasks.add(asyncio.create_task(self._tick()))
async def _tick(self) -> None:
while True:
await asyncio.sleep(self._resolution_s)
print(datetime.datetime.now())
async def main() -> None:
# As an async context manager
async with Clock(resolution_s=1):
await asyncio.sleep(5)
# Manual start/stop (only use if necessary, as cleanup is more complicated)
clock = Clock(resolution_s=1)
clock.start()
await asyncio.sleep(5)
await clock.stop()
asyncio.run(main())
```
Example: Background service example using custom stopping logic
If you need to implement custom stopping logic, you can override the
[`cancel()`][frequenz.sdk.actor.BackgroundService.cancel] and
[`wait()`][frequenz.sdk.actor.BackgroundService.wait] methods, and the
[`is_running`][frequenz.sdk.actor.BackgroundService.is_running] property.
For example, if you are using an external library that uses tasks internally and
you don't have access to them.
```python
from typing_extensions import override
import asyncio
class SomeService(BackgroundService):
def __init__(self, somelib, *, name: str | None = None) -> None:
self.somelib = somelib
super().__init__(name=name)
@override
def start(self) -> None:
self.somelib.start()
@property
@override
def is_running(self) -> bool:
return self.somelib.is_running()
@override
def cancel(self, msg: str | None = None) -> None:
self.somelib.cancel()
@override
async def wait(self) -> None:
try:
await self.somelib.wait()
except BaseExceptionGroup as exc:
raise BaseExceptionGroup("Error while stopping SomeService", [exc]) from exc
```
"""
def __init__(self, *, name: str | None = None) -> None:
"""Initialize this BackgroundService.
Args:
name: The name of this background service. If `None`, `str(id(self))` will
be used. This is used mostly for debugging purposes.
"""
self._name: str = str(id(self)) if name is None else name
self._tasks: set[asyncio.Task[Any]] = set()
@abc.abstractmethod
def start(self) -> None:
"""Start this background service."""
@property
def name(self) -> str:
"""The name of this background service.
Returns:
The name of this background service.
"""
return self._name
@property
def tasks(self) -> collections.abc.Set[asyncio.Task[Any]]:
"""Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use
them for informational purposes.
!!! danger
Changing the returned tasks may lead to unexpected behavior, don't do it
unless the class explicitly documents it is safe to do so.
Returns:
The set of running tasks spawned by this background service.
"""
return self._tasks
@property
def is_running(self) -> bool:
"""Return whether this background service is running.
A service is considered running when at least one task is running.
Returns:
Whether this background service is running.
"""
return any(not task.done() for task in self._tasks)
def cancel(self, msg: str | None = None) -> None:
"""Cancel all running tasks spawned by this background service.
Args:
msg: The message to be passed to the tasks being cancelled.
"""
_logger.debug(
"Service %s cancelled%s", self, f": {msg}" if msg is not None else ""
)
for task in self._tasks:
task.cancel(msg)
# We need the noqa because pydoclint can't figure out `rest` is
# a `BaseExceptionGroup` instance.
async def stop(self, msg: str | None = None) -> None: # noqa: DOC503
"""Stop this background service.
This method cancels all running tasks spawned by this service and waits for them
to finish.
Args:
msg: The message to be passed to the tasks being cancelled.
Raises:
BaseExceptionGroup: If any of the tasks spawned by this service raised an
exception.
"""
self.cancel(msg)
try:
await self.wait()
except BaseExceptionGroup as exc_group:
# We want to ignore CancelledError here as we explicitly cancelled all the
# tasks.
_, rest = exc_group.split(asyncio.CancelledError)
if rest is not None:
# We are filtering out from an exception group, we really don't want to
# add the exceptions we just filtered by adding a from clause here.
raise rest # pylint: disable=raise-missing-from
async def __aenter__(self) -> Self:
"""Enter an async context.
Start this background service.
Returns:
This background service.
"""
self.start()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit an async context.
Stop this background service.
Args:
exc_type: The type of the exception raised, if any.
exc_val: The exception raised, if any.
exc_tb: The traceback of the exception raised, if any.
"""
await self.stop()
async def wait(self) -> None:
"""Wait this background service to finish.
Wait until all background service tasks are finished.
Raises:
BaseExceptionGroup: If any of the tasks spawned by this service raised an
exception (`CancelError` is not considered an error and not returned in
the exception group).
"""
# We need to account for tasks that were created between when we started
# awaiting and we finished awaiting.
while self._tasks:
done, pending = await asyncio.wait(self._tasks)
assert not pending
# We remove the done tasks, but there might be new ones created after we
# started waiting.
self._tasks = self._tasks - done
exceptions: list[BaseException] = []
for task in done:
try:
# This will raise a CancelledError if the task was cancelled or any
# other exception if the task raised one.
_ = task.result()
except BaseException as error: # pylint: disable=broad-except
exceptions.append(error)
if exceptions:
raise BaseExceptionGroup(
f"Error while stopping background service {self}", exceptions
)
def __await__(self) -> collections.abc.Generator[None, None, None]:
"""Await this background service.
An awaited background service will wait for all its tasks to finish.
Returns:
An implementation-specific generator for the awaitable.
"""
return self.wait().__await__()
def __repr__(self) -> str:
"""Return a string representation of this instance.
Returns:
A string representation of this instance.
"""
return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
def __str__(self) -> str:
"""Return a string representation of this instance.
Returns:
A string representation of this instance.
"""
return f"{type(self).__name__}[{self._name}]"