Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 41 additions & 14 deletions tests/e2e/test_actor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,56 @@ async def test_event_listener_can_be_removed_successfully(
) -> None:
async def main() -> None:
import os
from typing import Any

from crawlee.events._types import Event

from apify._consts import ApifyEnvVars

os.environ[ApifyEnvVars.PERSIST_STATE_INTERVAL_MILLIS] = '100'

counter = 0

def count_event(data: Any) -> None:
nonlocal counter
print(data)
counter += 1
# `removed_count` is bumped by the listener we later remove. `total_count` is bumped by a
# reference listener that stays subscribed the whole time and acts as a heartbeat, letting the
# test wait for real events instead of guessing wall-clock durations. Both listeners are async
# so they run on the event loop and can drive `heartbeat` safely.
removed_count = 0
total_count = 0
heartbeat = asyncio.Event()

async def removed_listener() -> None:
nonlocal removed_count
removed_count += 1

async def reference_listener() -> None:
nonlocal total_count
total_count += 1
heartbeat.set()

async def wait_for_events(n: int) -> None:
"""Block until the reference listener has observed at least `n` more events."""
target = total_count + n
while total_count < target:
heartbeat.clear()
if total_count >= target:
return
await heartbeat.wait()

async with Actor:
Actor.on(Event.PERSIST_STATE, count_event)
await asyncio.sleep(0.5)
assert counter > 1
last_count = counter
Actor.off(Event.PERSIST_STATE, count_event)
await asyncio.sleep(0.5)
assert counter == last_count
Actor.on(Event.PERSIST_STATE, removed_listener)
Actor.on(Event.PERSIST_STATE, reference_listener)

# Both listeners are live: wait until the one we will remove has demonstrably received events.
while removed_count < 2:
await wait_for_events(1)

Actor.off(Event.PERSIST_STATE, removed_listener)

# One more event cycle flushes any invocation dispatched just before `off`, then snapshot.
await wait_for_events(1)
count_after_off = removed_count

# Over several further event cycles, the removed listener must not be called again.
await wait_for_events(5)
assert removed_count == count_after_off

actor = await make_actor(label='actor-off-event', main_func=main)
run_result = await run_actor(actor)
Expand Down
Loading