Skip to content

Commit 31e16d2

Browse files
committed
Version 1: State depends on crawler_id, but stats does not.
1 parent 88e0fb1 commit 31e16d2

4 files changed

Lines changed: 150 additions & 30 deletions

File tree

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ class BasicCrawler(Generic[TCrawlingContext, TStatisticsState]):
266266

267267
_CRAWLEE_STATE_KEY = 'CRAWLEE_STATE'
268268
_request_handler_timeout_text = 'Request handler timed out after'
269+
__next_id = 0
269270

270271
def __init__(
271272
self,
@@ -297,7 +298,7 @@ def __init__(
297298
status_message_logging_interval: timedelta = timedelta(seconds=10),
298299
status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]]
299300
| None = None,
300-
id: int | None = None,
301+
crawler_id: int | None = None,
301302
_context_pipeline: ContextPipeline[TCrawlingContext] | None = None,
302303
_additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None,
303304
_logger: logging.Logger | None = None,
@@ -350,7 +351,7 @@ def __init__(
350351
status_message_logging_interval: Interval for logging the crawler status messages.
351352
status_message_callback: Allows overriding the default status message. The default status message is
352353
provided in the parameters. Returning `None` suppresses the status message.
353-
id: Id of the crawler used for state and statistics tracking. You can use same explicit id to share same
354+
crawler_id: Id of the crawler used for state and statistics tracking. You can use same explicit id to share
354355
state and statistics between two crawlers. By default, each crawler will use own state and statistics.
355356
_context_pipeline: Enables extending the request lifecycle and modifying the crawling context.
356357
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
@@ -359,6 +360,13 @@ def __init__(
359360
_logger: A logger instance, typically provided by a subclass, for consistent logging labels.
360361
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
361362
"""
363+
if crawler_id is None:
364+
# This could look into set of already used ids, but lets not overengineer this.
365+
self.id = BasicCrawler.__next_id
366+
BasicCrawler.__next_id += 1
367+
else:
368+
self.id = crawler_id
369+
362370
implicit_event_manager_with_explicit_config = False
363371
if not configuration:
364372
configuration = service_locator.get_configuration()
@@ -834,7 +842,7 @@ async def _use_state(
834842
default_value: dict[str, JsonSerializable] | None = None,
835843
) -> dict[str, JsonSerializable]:
836844
kvs = await self.get_key_value_store()
837-
return await kvs.get_auto_saved_value(self._CRAWLEE_STATE_KEY, default_value)
845+
return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self.id}', default_value)
838846

839847
async def _save_crawler_state(self) -> None:
840848
store = await self.get_key_value_store()

tests/unit/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from uvicorn.config import Config
1313

1414
from crawlee import service_locator
15+
from crawlee.crawlers import BasicCrawler
1516
from crawlee.fingerprint_suite._browserforge_adapter import get_available_header_network
1617
from crawlee.http_clients import CurlImpersonateHttpClient, HttpxHttpClient, ImpitHttpClient
1718
from crawlee.proxy_configuration import ProxyInfo
@@ -74,6 +75,7 @@ def _prepare_test_env() -> None:
7475
# Reset global class variables to ensure test isolation.
7576
KeyValueStore._autosaved_values = {}
7677
Statistics._Statistics__next_id = 0 # type:ignore[attr-defined] # Mangled attribute
78+
BasicCrawler._BasicCrawler__next_id = 0 # type:ignore[attr-defined] # Mangled attribute
7779

7880
return _prepare_test_env
7981

tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ async def test_adaptive_crawling_result_use_state_isolation(
381381
crawler = AdaptivePlaywrightCrawler.with_beautifulsoup_static_parser(
382382
rendering_type_predictor=static_only_predictor_enforce_detection,
383383
)
384-
await key_value_store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0})
384+
await key_value_store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0})
385385
request_handler_calls = 0
386386

387387
@crawler.router.default_handler
@@ -398,7 +398,7 @@ async def request_handler(context: AdaptivePlaywrightCrawlingContext) -> None:
398398
# Request handler was called twice
399399
assert request_handler_calls == 2
400400
# Increment of global state happened only once
401-
assert (await key_value_store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 1
401+
assert (await key_value_store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 1
402402

403403

404404
async def test_adaptive_crawling_statistics(test_urls: list[str]) -> None:

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 135 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -810,11 +810,62 @@ async def handler(context: BasicCrawlingContext) -> None:
810810
await crawler.run(['https://hello.world'])
811811

812812
kvs = await crawler.get_key_value_store()
813-
value = await kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY)
813+
value = await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
814814

815815
assert value == {'hello': 'world'}
816816

817817

818+
async def test_context_use_state_crawlers_share_state() -> None:
819+
async def handler(context: BasicCrawlingContext) -> None:
820+
state = await context.use_state({'urls': []})
821+
assert isinstance(state['urls'], list)
822+
state['urls'].append(context.request.url)
823+
824+
crawler_1 = BasicCrawler(crawler_id=0, request_handler=handler)
825+
crawler_2 = BasicCrawler(crawler_id=0, request_handler=handler)
826+
827+
await crawler_1.run(['https://a.com'])
828+
await crawler_2.run(['https://b.com'])
829+
830+
kvs = await KeyValueStore.open()
831+
assert crawler_1.id == crawler_2.id == 0
832+
assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_{crawler_1.id}') == {
833+
'urls': ['https://a.com', 'https://b.com']
834+
}
835+
836+
837+
async def test_crawlers_share_stats() -> None:
838+
async def handler(context: BasicCrawlingContext) -> None:
839+
await context.use_state({'urls': []})
840+
841+
crawler_1 = BasicCrawler(crawler_id=0, request_handler=handler)
842+
crawler_2 = BasicCrawler(crawler_id=0, request_handler=handler, statistics=crawler_1.statistics)
843+
844+
result1 = await crawler_1.run(['https://a.com'])
845+
result2 = await crawler_2.run(['https://b.com'])
846+
847+
assert crawler_1.statistics == crawler_2.statistics
848+
assert result1.requests_finished == 1
849+
assert result2.requests_finished == 2
850+
851+
852+
async def test_context_use_state_crawlers_own_state() -> None:
853+
async def handler(context: BasicCrawlingContext) -> None:
854+
state = await context.use_state({'urls': []})
855+
assert isinstance(state['urls'], list)
856+
state['urls'].append(context.request.url)
857+
858+
crawler_1 = BasicCrawler(request_handler=handler)
859+
crawler_2 = BasicCrawler(request_handler=handler)
860+
861+
await crawler_1.run(['https://a.com'])
862+
await crawler_2.run(['https://b.com'])
863+
864+
kvs = await KeyValueStore.open()
865+
assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') == {'urls': ['https://a.com']}
866+
assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') == {'urls': ['https://b.com']}
867+
868+
818869
async def test_context_handlers_use_state(key_value_store: KeyValueStore) -> None:
819870
state_in_handler_one: dict[str, JsonSerializable] = {}
820871
state_in_handler_two: dict[str, JsonSerializable] = {}
@@ -855,7 +906,7 @@ async def handler_three(context: BasicCrawlingContext) -> None:
855906
store = await crawler.get_key_value_store()
856907

857908
# The state in the KVS must match with the last set state
858-
assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY)) == {'hello': 'last_world'}
909+
assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')) == {'hello': 'last_world'}
859910

860911

861912
async def test_max_requests_per_crawl() -> None:
@@ -1283,7 +1334,7 @@ async def test_context_use_state_race_condition_in_handlers(key_value_store: Key
12831334

12841335
crawler = BasicCrawler()
12851336
store = await crawler.get_key_value_store()
1286-
await store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0})
1337+
await store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0})
12871338
handler_barrier = Barrier(2)
12881339

12891340
@crawler.router.default_handler
@@ -1298,7 +1349,7 @@ async def handler(context: BasicCrawlingContext) -> None:
12981349
store = await crawler.get_key_value_store()
12991350
# Ensure that local state is pushed back to kvs.
13001351
await store.persist_autosaved_values()
1301-
assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 2
1352+
assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 2
13021353

13031354

13041355
@pytest.mark.run_alone
@@ -1750,16 +1801,10 @@ async def handler(context: BasicCrawlingContext) -> None:
17501801
assert await unrelated_rq.fetch_next_request() == unrelated_request
17511802

17521803

1753-
async def _run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
1804+
async def _run_crawler(crawler_id: int | None, requests: list[str], storage_dir: str) -> StatisticsState:
17541805
"""Run crawler and return its statistics state.
17551806
17561807
Must be defined like this to be pickable for ProcessPoolExecutor."""
1757-
service_locator.set_configuration(
1758-
Configuration(
1759-
storage_dir=storage_dir,
1760-
purge_on_start=False,
1761-
)
1762-
)
17631808

17641809
async def request_handler(context: BasicCrawlingContext) -> None:
17651810
context.log.info(f'Processing {context.request.url} ...')
@@ -1773,50 +1818,69 @@ async def request_handler(context: BasicCrawlingContext) -> None:
17731818
crawler = BasicCrawler(
17741819
request_handler=request_handler,
17751820
concurrency_settings=ConcurrencySettings(max_concurrency=1, desired_concurrency=1),
1821+
configuration=Configuration(
1822+
storage_dir=storage_dir,
1823+
purge_on_start=False,
1824+
),
17761825
)
17771826

17781827
await crawler.run(requests)
17791828
return crawler.statistics.state
17801829

17811830

1782-
def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
1783-
return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir))
1831+
@dataclass
1832+
class _CrawlerInput:
1833+
requests: list[str]
1834+
id: None | int = None
1835+
1836+
1837+
def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) -> list[StatisticsState]:
1838+
return [
1839+
asyncio.run(_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir))
1840+
for crawler_input in crawler_inputs
1841+
]
17841842

17851843

17861844
async def test_crawler_state_persistence(tmp_path: Path) -> None:
1787-
"""Test that crawler statistics persist and are loaded correctly.
1845+
"""Test that crawler statistics and state persist and are loaded correctly.
17881846
17891847
This test simulates starting the crawler process twice, and checks that the statistics include first run."""
17901848

1791-
state_kvs = await KeyValueStore.open(storage_client=FileSystemStorageClient(),
1792-
configuration=Configuration(storage_dir=str(tmp_path)))
1849+
state_kvs = await KeyValueStore.open(
1850+
storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path))
1851+
)
17931852

17941853
with ProcessPoolExecutor() as executor:
17951854
# Crawl 2 requests in the first run and automatically persist the state.
17961855
first_run_state = executor.submit(
1797-
_process_run_crawler,
1798-
requests=['https://a.placeholder.com', 'https://b.placeholder.com'],
1856+
_process_run_crawlers,
1857+
crawler_inputs=[_CrawlerInput(requests=['https://a.placeholder.com', 'https://b.placeholder.com'])],
17991858
storage_dir=str(tmp_path),
1800-
).result()
1859+
).result()[0]
18011860
# Expected state after first crawler run
18021861
assert first_run_state.requests_finished == 2
1803-
state = await state_kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY)
1862+
state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
18041863
assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com']
18051864

18061865
# Do not reuse the executor to simulate a fresh process to avoid modified class attributes.
18071866
with ProcessPoolExecutor() as executor:
18081867
# Crawl 1 additional requests in the second run, but use previously automatically persisted state.
18091868
second_run_state = executor.submit(
1810-
_process_run_crawler, requests=['https://c.placeholder.com'], storage_dir=str(tmp_path)
1811-
).result()
1869+
_process_run_crawlers,
1870+
crawler_inputs=[_CrawlerInput(requests=['https://c.placeholder.com'])],
1871+
storage_dir=str(tmp_path),
1872+
).result()[0]
18121873

18131874
# Expected state after second crawler run
18141875
# 2 requests from first run and 1 request from second run.
18151876
assert second_run_state.requests_finished == 3
18161877

1817-
state = await state_kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY)
1818-
assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com',
1819-
'https://c.placeholder.com']
1878+
state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
1879+
assert state.get('urls') == [
1880+
'https://a.placeholder.com',
1881+
'https://b.placeholder.com',
1882+
'https://c.placeholder.com',
1883+
]
18201884

18211885
assert first_run_state.crawler_started_at == second_run_state.crawler_started_at
18221886
assert first_run_state.crawler_finished_at
@@ -1826,6 +1890,52 @@ async def test_crawler_state_persistence(tmp_path: Path) -> None:
18261890
assert first_run_state.crawler_runtime < second_run_state.crawler_runtime
18271891

18281892

1893+
async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Path) -> None:
1894+
"""Test that crawler statistics and state persist and are loaded correctly.
1895+
1896+
This test simulates starting the crawler process twice, and checks that the statistics include first run.
1897+
Each time two distinct crawlers are running, and they should keep using their own statistics and state."""
1898+
state_kvs = await KeyValueStore.open(
1899+
storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path))
1900+
)
1901+
1902+
with ProcessPoolExecutor() as executor:
1903+
# Run 2 crawler, each crawl 1 request in and automatically persist the state.
1904+
first_run_states = executor.submit(
1905+
_process_run_crawlers,
1906+
crawler_inputs=[
1907+
_CrawlerInput(requests=['https://a.placeholder.com']),
1908+
_CrawlerInput(requests=['https://c.placeholder.com']),
1909+
],
1910+
storage_dir=str(tmp_path),
1911+
).result()
1912+
# Expected state after first crawler run
1913+
assert first_run_states[0].requests_finished == 1
1914+
assert first_run_states[1].requests_finished == 1
1915+
state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
1916+
assert state_0.get('urls') == ['https://a.placeholder.com']
1917+
state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1')
1918+
assert state_1.get('urls') == ['https://c.placeholder.com']
1919+
1920+
with ProcessPoolExecutor() as executor:
1921+
# Run 2 crawler, each crawl 1 request in and automatically persist the state.
1922+
second_run_states = executor.submit(
1923+
_process_run_crawlers,
1924+
crawler_inputs=[
1925+
_CrawlerInput(requests=['https://b.placeholder.com']),
1926+
_CrawlerInput(requests=['https://d.placeholder.com']),
1927+
],
1928+
storage_dir=str(tmp_path),
1929+
).result()
1930+
# Expected state after first crawler run
1931+
assert second_run_states[0].requests_finished == 2
1932+
assert second_run_states[1].requests_finished == 2
1933+
state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
1934+
assert state_0.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com']
1935+
state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1')
1936+
assert state_1.get('urls') == ['https://c.placeholder.com', 'https://d.placeholder.com']
1937+
1938+
18291939
async def test_crawler_intermediate_statistics() -> None:
18301940
"""Test that crawler statistics are correctly updating total runtime on every calculate call."""
18311941
crawler = BasicCrawler()

0 commit comments

Comments
 (0)