-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathworkflows.py
More file actions
362 lines (292 loc) · 12.2 KB
/
workflows.py
File metadata and controls
362 lines (292 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
"""Workflows service
This module provides the WorkflowsService class for workflow-related API operations.
"""
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING, Any
import httpx
from render_sdk.client.errors import RenderError, TaskRunError
from render_sdk.client.sse import parse_stream
from render_sdk.client.types import (
ListTaskRunsParams,
TaskData,
TaskIdentifier,
TaskRun,
TaskRunDetails,
TaskRunStatusValues,
)
from render_sdk.client.util import (
handle_http_error,
handle_http_errors,
handle_httpx_exception,
retry_with_backoff,
)
from render_sdk.public_api.api.workflow_tasks_ea import (
cancel_task_run,
create_task,
get_task_run,
list_task_runs,
)
from render_sdk.public_api.api.workflow_tasks_ea.stream_task_runs_events import (
_get_kwargs,
)
from render_sdk.public_api.models.error import Error
from render_sdk.public_api.models.run_task import RunTask
from render_sdk.public_api.models.task_data_type_1 import TaskDataType1
from render_sdk.public_api.types import UNSET, Response
from render_sdk.version import get_user_agent
if TYPE_CHECKING:
from render_sdk.client.client import Client
class AwaitableTaskRun:
"""TaskRun with awaitable functionality for waiting on completion.
This class wraps a TaskRun and makes it awaitable, so you can use:
`result = await task_run`
"""
def __init__(self, task_run: TaskRun, workflows_service: "WorkflowsService"):
self.task_run = task_run
self.workflows_service = workflows_service
self._details: TaskRunDetails | None = None
@property
def id(self) -> str:
"""Get the task run ID."""
return self.task_run.id
@property
def status(self) -> str:
"""Get the current task run status."""
return self.task_run.status.value
def is_terminal_status(self) -> bool:
"""Check if the task run is in a terminal state."""
return self.status in (
TaskRunStatusValues.COMPLETED,
TaskRunStatusValues.FAILED,
TaskRunStatusValues.CANCELED,
)
def __await__(self):
"""Make AwaitableTaskRun awaitable directly."""
return self._wait_for_completion().__await__()
async def _wait_for_completion(self) -> TaskRunDetails:
"""Internal method to wait for task completion.
Returns:
TaskRunDetails: The final task run details
Raises:
RenderError: If the task run completed with no event
TaskRunError: If the task run fails with an error
ClientError: For 4xx client errors when polling task status
ServerError: For 5xx server errors and network failures
TimeoutError: If requests time out while polling
"""
# If already completed, get current details and return
if self.is_terminal_status():
self._details = await self.workflows_service.get_task_run(self.id)
return self._details
return await retry_with_backoff(
self._task_run_completed_with_sse,
max_retries=5,
poll_interval=1.0,
backoff_factor=2.0,
exempted_exceptions=(TaskRunError,),
)
async def _task_run_completed_with_sse(self) -> TaskRunDetails:
async for event in self.workflows_service.task_run_events([self.id]):
if event and event.id == self.id:
# Update our internal state
self._details = event
if event.error:
raise TaskRunError(event.error)
return event
raise RenderError("Task run completed with no event")
class WorkflowsService:
"""Service for workflow-related API operations.
This class provides methods for running tasks, getting task run details,
canceling tasks, and listing task runs.
"""
def __init__(self, client: "Client"):
self.client = client
async def task_run_events(
self,
task_run_ids: list[str],
) -> AsyncIterator[TaskRunDetails]:
"""Stream task run events via SSE.
Args:
task_run_ids: List of task run IDs to stream events for
Yields:
TaskRunDetails: Task run event updates
Raises:
TimeoutError: For timeout-related errors
ClientError: For other client-side errors
ServerError: For connection errors that might indicate server issues
"""
kwargs = _get_kwargs(task_run_ids=task_run_ids)
timeout = httpx.Timeout(
connect=5.0, write=5.0, read=None, pool=None
) # These can be long lived
async with httpx.AsyncClient(timeout=timeout) as http_client:
headers = kwargs.get("headers", {})
headers.update(
{
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
"Authorization": f"Bearer {self.client.token}",
"User-Agent": get_user_agent(),
}
)
url = f"{self.client.internal._base_url}{kwargs['url']}"
try:
async with http_client.stream(
method=kwargs["method"],
url=url,
params=kwargs.get("params", {}),
headers=headers,
) as response:
handle_http_error(response, "SSE stream")
async for event in parse_stream(response.aiter_bytes()):
yield event
except httpx.RequestError as e:
handle_httpx_exception(e, "SSE connection")
async def start_task(
self,
task_identifier: TaskIdentifier,
input_data: TaskData,
) -> AwaitableTaskRun:
"""Start a task and return an awaitable task run without waiting for completion.
The returned AwaitableTaskRun provides the task run ID immediately.
To wait for the result, await it:
task_run = await workflows.start_task("my-workflow/task", [42])
print(task_run.id) # available immediately
result = await task_run # opens connection, waits for completion
For fire-and-forget, simply discard the task run.
This corresponds to POST /task-runs in the API.
Args:
task_identifier: The identifier of the task to run
input_data: The input data for the task. Can be either:
- A list for positional arguments: [arg1, arg2, arg3]
- A dict for named parameters: {"param1": value1, "param2": value2}
Returns:
AwaitableTaskRun: A task run that can be awaited for the result
Raises:
ClientError: For 4xx client errors (invalid task, malformed input, etc.)
ServerError: For 5xx server errors and network failures
TimeoutError: If the request times out
"""
response = (
await self._create_task_api_call(task_identifier, input_data)
).parsed
return AwaitableTaskRun(response, self)
async def run_task(
self,
task_identifier: TaskIdentifier,
input_data: TaskData,
) -> TaskRunDetails:
"""Start a task and wait for it to complete, returning the result.
This is a convenience wrapper around start_task() that additionally
waits for the task to complete.
Args:
task_identifier: The identifier of the task to run
input_data: The input data for the task. Can be either:
- A list for positional arguments: [arg1, arg2, arg3]
- A dict for named parameters: {"param1": value1, "param2": value2}
Returns:
TaskRunDetails: The completed task run details
Raises:
ClientError: For 4xx client errors (invalid task, malformed input, etc.)
ServerError: For 5xx server errors and network failures
TimeoutError: If the request times out
TaskRunError: If the task run fails with an error
"""
task_run = await self.start_task(task_identifier, input_data)
return await task_run
@handle_http_errors("create task")
async def _create_task_api_call(
self, task_identifier: TaskIdentifier, input_data: TaskData
) -> Response[Error | TaskRun]:
"""Internal method to make the create task API call."""
# Convert dict to TaskDataType1 for named parameters
task_data_input: TaskDataType1 | list[Any]
if isinstance(input_data, dict):
task_data_input = TaskDataType1.from_dict(input_data)
else:
task_data_input = input_data
# Create the request body
run_task = RunTask(
task=task_identifier,
input_=task_data_input,
)
# Make the API call
return await create_task.asyncio_detailed(
client=self.client.internal,
body=run_task,
)
async def get_task_run(self, task_run_id: str) -> TaskRunDetails:
"""Get details about a specific task run.
This corresponds to GET /task-runs/{taskRunId} in the API.
Args:
task_run_id: The ID of the task run to retrieve
Returns:
TaskRunDetails: The task run details
Raises:
ClientError: For 4xx client errors (task not found, invalid ID, etc.)
ServerError: For 5xx server errors and network failures
TimeoutError: If the request times out
"""
return (await self._get_task_run_api_call(task_run_id)).parsed
@handle_http_errors("get task run")
async def _get_task_run_api_call(
self, task_run_id: str
) -> Response[Error | TaskRunDetails]:
"""Internal method to make the get task run API call."""
return await get_task_run.asyncio_detailed(
client=self.client.internal,
task_run_id=task_run_id,
)
async def cancel_task_run(self, task_run_id: str) -> None:
"""Cancel a running task.
This corresponds to DELETE /task-runs/{taskRunId} in the API.
Args:
task_run_id: The ID of the task run to cancel
Raises:
ClientError: For 4xx client errors (task not found, already completed, etc.)
ServerError: For 5xx server errors and network failures
TimeoutError: If the request times out
"""
await self._cancel_task_run_api_call(task_run_id)
# cancel_task_run returns None on success (204 status)
# Error objects will be handled by the decorator
@handle_http_errors("cancel task run")
async def _cancel_task_run_api_call(
self, task_run_id: str
) -> Response[Any | Error]:
"""Internal method to make the cancel task run API call."""
return await cancel_task_run.asyncio_detailed(
client=self.client.internal,
task_run_id=task_run_id,
)
async def list_task_runs(
self,
params: ListTaskRunsParams | None = None,
) -> list[TaskRun]:
"""List task runs with optional filtering.
This corresponds to GET /task-runs in the API.
Args:
params: Optional parameters for filtering the results
Returns:
list[TaskRun]: List of task runs
Raises:
ClientError: For 4xx client errors (invalid parameters, unauthorized, etc.)
ServerError: For 5xx server errors and network failures
TimeoutError: If the request times out
"""
return (await self._list_task_runs_api_call(params)).parsed
@handle_http_errors("list task runs")
async def _list_task_runs_api_call(
self, params: ListTaskRunsParams | None = None
) -> Response[Error | list[TaskRun]]:
"""Internal method to make the list task runs API call."""
# Convert params to API parameters
limit = params.limit if params and params.limit is not None else UNSET
cursor = params.cursor if params and params.cursor is not None else UNSET
owner_id = params.owner_id if params and params.owner_id is not None else UNSET
return await list_task_runs.asyncio_detailed(
client=self.client.internal,
limit=limit,
cursor=cursor,
owner_id=owner_id,
)