fix: async-safe embeddings and resilient drain_writes#5702
fix: async-safe embeddings and resilient drain_writes#5702MatthiasHowellYopp wants to merge 1 commit into
Conversation
d425472 to
c1a3805
Compare
c1a3805 to
6df71c1
Compare
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThe PR normalizes embeddings (including bytes and NumPy arrays), makes batch embedding calls safe when an asyncio loop is active by offloading to a thread pool with a 30s timeout, and replaces indefinite background-write blocking with per-save timeouts and logging. Tests covering embedding normalization and async-safe embedding behavior were added. ChangesEmbedding Normalization and Async-Safe Embedding
🎯 3 (Moderate) | ⏱️ ~25 minutes
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/crewai/src/crewai/memory/unified_memory.py (1)
323-383:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift
drain_writestimeout resilience is undermined byclose()shutdown/close sequencing.After a per-save timeout, writes can still be in-flight, but
close()still doesstorage.close()andshutdown(wait=True). That can (1) race writers against a closed storage backend and (2) still block crew shutdown indefinitely.Suggested direction (bounded shutdown path)
def close(self) -> None: """Drain pending saves, flush storage, and shut down the background thread pool.""" self.drain_writes() - if hasattr(self._storage, "close"): - self._storage.close() - self._save_pool.shutdown(wait=True) + with self._pending_lock: + has_inflight = any(not f.done() for f in self._pending_saves) + + if has_inflight: + _logger.warning( + "[CLOSE] In-flight saves remain after drain timeout; " + "skipping blocking shutdown path to avoid hanging crew completion." + ) + self._save_pool.shutdown(wait=False, cancel_futures=True) + return + + self._save_pool.shutdown(wait=True, cancel_futures=True) + if hasattr(self._storage, "close"): + self._storage.close()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/unified_memory.py` around lines 323 - 383, The close() sequence currently races with in-flight background saves because it calls storage.close() and then shuts down the pool unconditionally; change close() to first call drain_writes(...) with a bounded total timeout (e.g., timeout_per_save * max(1, len(pending)) or a configurable overall timeout) to allow background saves to finish, then stop blocking shutdown: after that attempt a non-blocking shutdown of the save pool (use _save_pool.shutdown(wait=False) or otherwise avoid indefinite blocking), attempt to cancel any remaining futures in self._pending_saves, and only call _storage.close() after pending futures were awaited/cancelled (or after the non-blocking shutdown) to avoid racing writers against a closed storage backend; refer to drain_writes, close, _save_pool, _storage, and self._pending_saves when making the changes.
🧹 Nitpick comments (1)
lib/crewai/src/crewai/memory/unified_memory.py (1)
365-367: ⚡ Quick winInclude traceback on save failure logs for actionable diagnostics.
The error log only prints
str(e). Addexc_info=Trueso failures in drain mode are debuggable.Small logging improvement
- _logger.error( - "[DRAIN_WRITES] Save %d/%d failed: %s", i + 1, len(pending), e - ) + _logger.error( + "[DRAIN_WRITES] Save %d/%d failed: %s", + i + 1, + len(pending), + e, + exc_info=True, + )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/unified_memory.py` around lines 365 - 367, The error log in UnifiedMemory's drain routine uses _logger.error("[DRAIN_WRITES] Save %d/%d failed: %s", i + 1, len(pending), e) which only prints e; update the call in unified_memory.py (inside the drain/save loop in the UnifiedMemory class/function) to pass exc_info=True to the logger so the full traceback is included (i.e., call _logger.error(..., exc_info=True)) ensuring failures during drain mode produce actionable stack traces.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/memory/encoding_flow.py`:
- Around line 71-94: The validator ensure_embedding_is_list should stop
re-implementing bytes→list[float] coercion and instead delegate to
MemoryRecord's canonical normalization (MemoryRecord.validate_embedding) for
both single records and lists; update ensure_embedding_is_list to iterate over
similar_records when it's a list and replace each record.embedding with the
result of MemoryRecord.validate_embedding (or call the shared normalization
utility) rather than mutating raw bytes in-place, and for the single-record path
call the same MemoryRecord normalization so empty bytes behavior (b"" → None)
remains consistent with MemoryRecord.
---
Outside diff comments:
In `@lib/crewai/src/crewai/memory/unified_memory.py`:
- Around line 323-383: The close() sequence currently races with in-flight
background saves because it calls storage.close() and then shuts down the pool
unconditionally; change close() to first call drain_writes(...) with a bounded
total timeout (e.g., timeout_per_save * max(1, len(pending)) or a configurable
overall timeout) to allow background saves to finish, then stop blocking
shutdown: after that attempt a non-blocking shutdown of the save pool (use
_save_pool.shutdown(wait=False) or otherwise avoid indefinite blocking), attempt
to cancel any remaining futures in self._pending_saves, and only call
_storage.close() after pending futures were awaited/cancelled (or after the
non-blocking shutdown) to avoid racing writers against a closed storage backend;
refer to drain_writes, close, _save_pool, _storage, and self._pending_saves when
making the changes.
---
Nitpick comments:
In `@lib/crewai/src/crewai/memory/unified_memory.py`:
- Around line 365-367: The error log in UnifiedMemory's drain routine uses
_logger.error("[DRAIN_WRITES] Save %d/%d failed: %s", i + 1, len(pending), e)
which only prints e; update the call in unified_memory.py (inside the drain/save
loop in the UnifiedMemory class/function) to pass exc_info=True to the logger so
the full traceback is included (i.e., call _logger.error(..., exc_info=True))
ensuring failures during drain mode produce actionable stack traces.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: e68de661-3cc4-4216-96e5-f6486226094e
📒 Files selected for processing (4)
lib/crewai/src/crewai/memory/encoding_flow.pylib/crewai/src/crewai/memory/types.pylib/crewai/src/crewai/memory/unified_memory.pylib/crewai/tests/memory/test_embedding_safety.py
6df71c1 to
a4332f6
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
lib/crewai/src/crewai/memory/types.py (1)
316-340: 💤 Low valueConsider applying the same async-context handling as
embed_texts.
embed_textsoffloads to the thread pool when called from an async context (lines 386-404), butembed_textcalls the embedder directly. Ifembed_textis ever called from an async context, it will block the event loop.Since the comment acknowledges this is deferred tech debt, consider at minimum adding
embed_textas a wrapper that delegates toembed_textsfor consistency:♻️ Optional refactor
def embed_text(embedder: Any, text: str) -> list[float]: ... if not text or not text.strip(): return [] - - # Just call the embedder directly - the blocking issue needs to be fixed - # at a higher level (making Memory.recall() async) - result = embedder([text]) - - if not result: - return [] - first = result[0] - if hasattr(first, "tolist"): - return list(first.tolist()) - if isinstance(first, list): - return [float(x) for x in first] - return list(first) + results = embed_texts(embedder, [text]) + return results[0] if results else []🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/types.py` around lines 316 - 340, embed_text currently calls the embedder directly and can block the event loop unlike embed_texts which offloads to a thread pool; update embed_text to delegate to embed_texts for consistent async-aware behavior by calling embed_texts(embedder, [text]) (or its synchronous wrapper that runs embed_texts via the same ThreadPoolExecutor path used in embed_texts), then extract the first embedding, handle empty results, and convert types the same way as the current embed_text logic (preserve checks for .tolist and list). Also update the docstring to note that embed_text delegates to embed_texts to avoid blocking in async contexts.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@lib/crewai/src/crewai/memory/types.py`:
- Around line 316-340: embed_text currently calls the embedder directly and can
block the event loop unlike embed_texts which offloads to a thread pool; update
embed_text to delegate to embed_texts for consistent async-aware behavior by
calling embed_texts(embedder, [text]) (or its synchronous wrapper that runs
embed_texts via the same ThreadPoolExecutor path used in embed_texts), then
extract the first embedding, handle empty results, and convert types the same
way as the current embed_text logic (preserve checks for .tolist and list). Also
update the docstring to note that embed_text delegates to embed_texts to avoid
blocking in async contexts.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: df324f71-c9e3-457c-b693-918b702a38e6
📒 Files selected for processing (4)
lib/crewai/src/crewai/memory/encoding_flow.pylib/crewai/src/crewai/memory/types.pylib/crewai/src/crewai/memory/unified_memory.pylib/crewai/tests/memory/test_embedding_safety.py
✅ Files skipped from review due to trivial changes (1)
- lib/crewai/src/crewai/memory/unified_memory.py
🚧 Files skipped from review as they are similar to previous changes (2)
- lib/crewai/src/crewai/memory/encoding_flow.py
- lib/crewai/tests/memory/test_embedding_safety.py
8b8a533 to
bbbe31e
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (1)
lib/crewai/src/crewai/memory/types.py (1)
343-360:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftThis path still blocks the event loop.
Line 393 waits with
Future.result(timeout=30)on the caller thread. Ifembed_texts()is reached from a coroutine, that caller thread is the event-loop thread, so the loop stays frozen until the embedder finishes or times out. Timed-out workers also keep running, so two hung calls can still exhaust_EMBED_POOLand turn later async reads into the same empty-embedding fallback. Please move the async case to an actual async helper (await loop.run_in_executor(...)/asyncio.to_thread(...)+asyncio.wait_for(...)) and keep this function sync-only.In Python asyncio, if synchronous code running on the event-loop thread calls concurrent.futures.Future.result(timeout=30) on a ThreadPoolExecutor future, does that block the event loop until completion or timeout? If the timeout expires, does the worker thread continue running, and what is the recommended pattern for offloading blocking work from async code?Also applies to: 384-399
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/types.py` around lines 343 - 360, The current embed_texts function offloads to _EMBED_POOL but blocks the event-loop by calling Future.result(timeout=30) on the event-loop thread; instead make embed_texts strictly synchronous and move the async-path into a new async helper (e.g., async_embed_texts) that uses await loop.run_in_executor(...) or asyncio.to_thread(...) combined with asyncio.wait_for(..., timeout=30) so the event loop is not blocked and timed-out worker threads continue running without freezing the loop; update callers so coroutines call async_embed_texts while sync callers keep calling embed_texts, and reference _EMBED_POOL, embed_texts, and the new async_embed_texts (or chosen helper name) when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@lib/crewai/src/crewai/memory/types.py`:
- Around line 343-360: The current embed_texts function offloads to _EMBED_POOL
but blocks the event-loop by calling Future.result(timeout=30) on the event-loop
thread; instead make embed_texts strictly synchronous and move the async-path
into a new async helper (e.g., async_embed_texts) that uses await
loop.run_in_executor(...) or asyncio.to_thread(...) combined with
asyncio.wait_for(..., timeout=30) so the event loop is not blocked and timed-out
worker threads continue running without freezing the loop; update callers so
coroutines call async_embed_texts while sync callers keep calling embed_texts,
and reference _EMBED_POOL, embed_texts, and the new async_embed_texts (or chosen
helper name) when making the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 2eb45bad-c2d6-4985-9379-a115365a1142
📒 Files selected for processing (4)
lib/crewai/src/crewai/memory/encoding_flow.pylib/crewai/src/crewai/memory/types.pylib/crewai/src/crewai/memory/unified_memory.pylib/crewai/tests/memory/test_embedding_safety.py
🚧 Files skipped from review as they are similar to previous changes (3)
- lib/crewai/src/crewai/memory/unified_memory.py
- lib/crewai/src/crewai/memory/encoding_flow.py
- lib/crewai/tests/memory/test_embedding_safety.py
f9a7f08 to
2ddeaaf
Compare
|
@greysonlalonde hoping I can get a review on this. |
d2af721 to
e17a45d
Compare
4575a77 to
bb76f77
Compare
Add bytes→float validators on MemoryRecord and ItemState to handle Valkey returning embeddings as raw bytes. Make embed_texts() safe when called from an async context by using a thread pool. Improve drain_writes() with per-save timeouts and error logging instead of raising on failure. Part 3/4 of Valkey storage implementation.
bb76f77 to
c5dcef6
Compare
Description:
Part 3/4 of adding Valkey as a storage backend for CrewAI. This PR makes the embedding and memory persistence paths robust enough to work with async storage backends like Valkey.
What changed:
types.py — Added a field_validator on MemoryRecord.embedding that converts bytes to list[float] via numpy. Valkey stores vectors as raw bytes, so this ensures embeddings are always in the expected format regardless of storage backend. Also added a thread pool to embed_texts() so it doesn't block the event loop when called from an async context.
encoding_flow.py — Added a matching field_validator on ItemState.similar_records and result_record to handle the same bytes→float conversion during the consolidation flow.
unified_memory.py — drain_writes() now accepts a timeout_per_save parameter (default 60s) and logs warnings on timeout or failure instead of raising. This prevents a single slow or failed save from blocking crew completion. Added structured debug/warning/error logging throughout the drain cycle.
Testing:
test_embedding_safety.py (15 tests) — Covers bytes→float conversion, empty bytes, numpy arrays, int-to-float coercion, sync and async embed_texts behavior, empty/whitespace input handling.
Summary by CodeRabbit
Bug Fixes
Tests