forked from apify/apify-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_actor_lifecycle.py
More file actions
143 lines (101 loc) · 5.01 KB
/
test_actor_lifecycle.py
File metadata and controls
143 lines (101 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from __future__ import annotations
from typing import TYPE_CHECKING
from apify import Actor
if TYPE_CHECKING:
from .conftest import MakeActorFunction, RunActorFunction
async def test_actor_exit_with_different_exit_codes(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
async def main() -> None:
async with Actor:
input = await Actor.get_input() # noqa: A001
await Actor.exit(**input)
actor = await make_actor(label='actor-exit', main_func=main)
for exit_code in [0, 1, 101]:
run_result = await run_actor(actor, run_input={'exit_code': exit_code})
assert run_result.exit_code == exit_code
assert run_result.status == 'FAILED' if exit_code > 0 else 'SUCCEEDED'
async def test_actor_fail_with_custom_exit_codes_and_status_messages(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
async def main() -> None:
async with Actor:
input = await Actor.get_input() # noqa: A001
await Actor.fail(**input) if input else await Actor.fail()
actor = await make_actor(label='actor-fail', main_func=main)
run_result = await run_actor(actor)
assert run_result.exit_code == 1
assert run_result.status == 'FAILED'
for exit_code in [1, 10, 100]:
run_result = await run_actor(actor, run_input={'exit_code': exit_code})
assert run_result.exit_code == exit_code
assert run_result.status == 'FAILED'
# Fail with a status message.
run_result = await run_actor(actor, run_input={'status_message': 'This is a test message'})
assert run_result.status == 'FAILED'
assert run_result.status_message == 'This is a test message'
async def test_actor_fails_correctly_with_exception(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
async def main() -> None:
async with Actor:
raise Exception('This is a test exception') # noqa: TRY002
actor = await make_actor(label='with-actor-fail', main_func=main)
run_result = await run_actor(actor)
assert run_result.exit_code == 91
assert run_result.status == 'FAILED'
async def test_actor_with_crawler_reboot(make_actor: MakeActorFunction, run_actor: RunActorFunction) -> None:
"""Test that crawler in actor works as expected after reboot.
Handle two requests. Reboot in between the two requests. The second run should include statistics of the first run.
"""
async def main() -> None:
from crawlee._types import BasicCrawlingContext, ConcurrencySettings
from crawlee.crawlers import BasicCrawler
from apify import Actor
async with Actor:
crawler = BasicCrawler(concurrency_settings=ConcurrencySettings(max_concurrency=1, desired_concurrency=1))
requests = ['https://example.com/1', 'https://example.com/2']
run = await Actor.apify_client.run(Actor.configuration.actor_run_id or '').get()
assert run
first_run = run.get('stats', {}).get('rebootCount', 0) == 0
@crawler.router.default_handler
async def default_handler(context: BasicCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')
# Simulate migration through reboot
if context.request.url == requests[1] and first_run:
context.log.info(f'Reclaiming {context.request.url} ...')
rq = await crawler.get_request_manager()
await rq.reclaim_request(context.request)
await Actor.reboot()
await crawler.run(requests)
# Each time one request is finished.
expected_requests_finished = 1 if first_run else 2
assert crawler.statistics.state.requests_finished == expected_requests_finished
actor = await make_actor(label='migration', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
async def test_actor_sequential_contexts(make_actor: MakeActorFunction, run_actor: RunActorFunction) -> None:
"""Test that Actor and Actor() can be used in sequential async context manager blocks."""
async def main() -> None:
async with Actor as actor:
actor._exit_process = False
assert actor._is_initialized is True
# Actor after Actor.
async with Actor as actor:
actor._exit_process = False
assert actor._is_initialized is True
# Actor() after Actor.
async with Actor(exit_process=False) as actor:
assert actor._is_initialized is True
# Actor() after Actor().
async with Actor(exit_process=False) as actor:
assert actor._is_initialized is True
# Actor after Actor().
async with Actor as actor:
assert actor._is_initialized is True
actor = await make_actor(label='actor-sequential-contexts', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'