-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy path_channels.py
More file actions
61 lines (44 loc) · 1.67 KB
/
_channels.py
File metadata and controls
61 lines (44 loc) · 1.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
"""General purpose classes for use with channels."""
import abc
import logging
import typing
from frequenz.channels import Receiver
_logger = logging.getLogger(__name__)
T_co = typing.TypeVar("T_co", covariant=True)
U_co = typing.TypeVar("U_co", covariant=True)
T = typing.TypeVar("T")
class ReceiverFetcher(typing.Generic[T_co], typing.Protocol):
"""An interface that just exposes a `new_receiver` method."""
@abc.abstractmethod
def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
"""Get a receiver from the channel.
Args:
limit: The maximum size of the receiver.
Returns:
A receiver instance.
"""
class MappingReceiverFetcher(typing.Generic[T_co, U_co]):
"""A receiver fetcher that can manipulate receivers before returning them."""
def __init__(
self,
fetcher: ReceiverFetcher[T_co],
mapping_function: typing.Callable[[Receiver[T_co]], Receiver[U_co]],
) -> None:
"""Initialize this instance.
Args:
fetcher: The underlying fetcher to get receivers from.
mapping_function: The method to be applied on new receivers before returning
them.
"""
self._fetcher = fetcher
self._mapping_function = mapping_function
def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]:
"""Get a receiver from the channel.
Args:
limit: The maximum size of the receiver.
Returns:
A receiver instance.
"""
return self._mapping_function(self._fetcher.new_receiver(limit=limit))