-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsplit_client_wrapper.py
More file actions
185 lines (149 loc) · 7.31 KB
/
split_client_wrapper.py
File metadata and controls
185 lines (149 loc) · 7.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from splitio import get_factory, get_factory_async
from splitio.exceptions import TimeoutException
import logging
try:
from splitio.models.events import SdkEvent
except ImportError:
SdkEvent = None # type: ignore # Split < 10.6: no events API
_LOGGER = logging.getLogger(__name__)
# Sentinel for block_until_ready timeout (not a Split SdkEvent)
SPLIT_EVENT_BUR_TIMEOUT = "block_until_ready_timeout"
class SplitClientWrapper():
def __init__(self, initial_context):
self.sdk_ready = False
self.split_client = None
self._event_receiver = None
if not self._validate_context(initial_context):
raise AttributeError()
self._api_key = initial_context.get("SdkKey")
self._config = {}
if initial_context.get("ConfigOptions") != None:
self._config = initial_context.get("ConfigOptions")
self._ready_block_time = 10
if initial_context.get("ReadyBlockTime") != None:
self._ready_block_time = initial_context.get("ReadyBlockTime")
if initial_context.get("ThreadingMode") != None:
self._threading_mode = initial_context.get("ThreadingMode")
if self._threading_mode == "asyncio":
self._initial_context = initial_context
return
if initial_context.get("SplitClient") != None:
self.split_client = initial_context.get("SplitClient")
self._factory = self.split_client._factory
return
try:
self._factory = get_factory(self._api_key, config=self._config)
self._factory.block_until_ready(self._ready_block_time)
self.sdk_ready = True
except TimeoutException:
_LOGGER.debug("Split SDK timed out")
self._notify_receiver(SPLIT_EVENT_BUR_TIMEOUT, None)
self.split_client = self._factory.client()
async def create(self):
if self._initial_context.get("SplitClient") != None:
self.split_client = self._initial_context.get("SplitClient")
self._factory = self.split_client._factory
await self._register_split_events_async()
return
try:
self._factory = await get_factory_async(self._api_key, config=self._config)
await self._factory.block_until_ready(self._ready_block_time)
self.sdk_ready = True
except TimeoutException:
_LOGGER.debug("Split SDK timed out")
await self._notify_receiver_async(SPLIT_EVENT_BUR_TIMEOUT, None)
self.split_client = self._factory.client()
await self._register_split_events_async()
def is_sdk_ready(self):
if self.sdk_ready:
return True
try:
self._factory.block_until_ready(0.1)
self.sdk_ready = True
except TimeoutException:
_LOGGER.debug("Split SDK timed out")
return self.sdk_ready
def set_event_receiver(self, receiver):
"""Set the receiver that will be notified of Split SDK events (e.g. the provider)."""
self._event_receiver = receiver
def register_for_split_events(self):
"""Register for Split SDK events (SDK_READY, SDK_UPDATE). Pass the provider as receiver (or call set_event_receiver first)."""
self._register_split_events()
def unregister_for_split_events(self):
"""Stop receiving Split SDK events."""
self._event_receiver = None
def _notify_receiver(self, split_event, event_metadata):
if self._event_receiver is None:
_LOGGER.debug("Split event %s: no receiver registered", split_event)
return
try:
self._event_receiver._on_split_event(split_event, event_metadata)
except Exception as ex:
_LOGGER.debug("Split event callback error: %s", ex)
async def _notify_receiver_async(self, split_event, event_metadata):
"""Async version for use when the receiver is used in asyncio context (e.g. async event registration)."""
if self._event_receiver is None:
_LOGGER.debug("Split event %s: no receiver registered", split_event)
return
try:
await self._event_receiver._on_split_event_async(split_event, event_metadata)
except Exception as ex:
_LOGGER.debug("Split event callback error: %s", ex)
def _register_split_events(self):
if self._factory is None:
_LOGGER.warning("SplitClientWrapper: _factory is None, cannot register for SDK events")
return
if SdkEvent is None:
_LOGGER.debug("SplitClientWrapper: SdkEvent not available (Split SDK < 10.6?), skipping event registration")
return
try:
em = self._factory._events_manager
if not hasattr(em, "register"):
_LOGGER.warning("SplitClientWrapper: events_manager has no register method")
return
em.register(SdkEvent.SDK_READY, lambda m: self._notify_receiver(SdkEvent.SDK_READY, m))
em.register(SdkEvent.SDK_UPDATE, lambda m: self._notify_receiver(SdkEvent.SDK_UPDATE, m))
_LOGGER.info("SplitClientWrapper: registered for SDK_READY and SDK_UPDATE")
except Exception as ex:
_LOGGER.warning("Could not register Split events: %s", ex)
def destroy(self, destroy_event=None):
self._factory.destroy(destroy_event)
async def _register_split_events_async(self):
if self._factory is None or SdkEvent is None:
return
try:
em = self._factory._events_manager
if hasattr(em, "register"):
async def handler_ready(m):
await self._notify_receiver_async(SdkEvent.SDK_READY, m)
async def handler_update(m):
await self._notify_receiver_async(SdkEvent.SDK_UPDATE, m)
await em.register(SdkEvent.SDK_READY, handler_ready)
await em.register(SdkEvent.SDK_UPDATE, handler_update)
except Exception as ex:
_LOGGER.debug("Could not register Split events: %s", ex)
async def destroy_async(self):
await self._factory.destroy()
async def is_sdk_ready_async(self):
if self.sdk_ready:
return True
try:
await self._factory.block_until_ready(0.1)
self.sdk_ready = True
except TimeoutException:
_LOGGER.debug("Split SDK timed out")
return self.sdk_ready
def _validate_context(self, initial_context):
if initial_context != None and not isinstance(initial_context, dict):
_LOGGER.error("SplitClientWrapper: initial_context must be of type `dict`")
return False
if initial_context.get("SplitClient") == None and initial_context.get("SdkKey") == None:
_LOGGER.error("SplitClientWrapper: initial_context must contain keys `SplitClient` or `SdkKey`")
return False
if initial_context.get("SdkKey") != None and not isinstance(initial_context.get("SdkKey"), str):
_LOGGER.error("SplitClientWrapper: key `SdkKey` must be of type `str`")
return False
if initial_context.get("ConfigOptions") != None and not isinstance(initial_context.get("ConfigOptions"), dict):
_LOGGER.error("SplitClientWrapper: key `ConfigOptions` must be of type `dict`")
return False
return True