-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy path3_workflows.py
More file actions
67 lines (48 loc) · 2.35 KB
/
3_workflows.py
File metadata and controls
67 lines (48 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import uuid
import restate
from pydantic import BaseModel
from restate import Workflow, WorkflowContext, WorkflowSharedContext
from app.utils import create_user_entry, send_email_with_link
"""
Workflow are a special type of Virtual Object with a run handler that runs once per ID.
Workflows are stateful and can be interacted with via queries (getting data out of the workflow)
and signals (pushing data to the workflow).
Workflows are used to model long-running flows, such as user onboarding, order processing, etc.
Workflows have the following handlers:
- Main workflow in run() method
- Additional methods interact with the workflow.
Each workflow instance has a unique ID and runs only once (to success or failure).
"""
user_signup = Workflow("usersignup")
class User(BaseModel):
name: str
email: str
# --- The workflow logic ---
@user_signup.main()
async def run(ctx: WorkflowContext, user: User) -> bool:
# workflow ID = user ID; workflow runs once per user
user_id = ctx.key()
# Durably executed action; write to other system
await ctx.run_typed("create_user", create_user_entry, user=user)
# Send the email with the verification link
secret = str(ctx.uuid())
await ctx.run_typed("send_email", send_email_with_link, user_id=user_id, email=user.email, secret=secret)
# Wait until user clicked email verification link
# Promise gets resolved or rejected by the other handlers
click_secret = await ctx.promise("link_clicked").value()
return click_secret == secret
# --- Other handlers interact with the workflow via queries and signals ---
@user_signup.handler()
async def click(ctx: WorkflowSharedContext, secret: str):
# Send data to the workflow via a durable promise
await ctx.promise("link_clicked").resolve(secret)
# Define 'app' used by hypercorn (or other HTTP servers) to serve the SDK
app = restate.app(services=[user_signup])
"""
You can deploy this as a container, Lambda, etc. - Invoke it over HTTP via:
curl localhost:8080/usersignup/signup-userid1/run/send -H 'content-type: application/json' -d '{ "name": "Bob", "email": "bob@builder.com" }'
- Resolve the email link via:
curl localhost:8080/usersignup/signup-userid1/click -H 'content-type: application/json' -d '"<SECRET>"'
- Attach back to the workflow to get the result:
curl localhost:8080/restate/workflow/usersignup/signup-userid1/attach
"""