Skip to content

fix: async-safe embeddings and resilient drain_writes#5702

Open
MatthiasHowellYopp wants to merge 1 commit into
crewAIInc:mainfrom
MatthiasHowellYopp:feat/valkey-3-embedding-safety
Open

fix: async-safe embeddings and resilient drain_writes#5702
MatthiasHowellYopp wants to merge 1 commit into
crewAIInc:mainfrom
MatthiasHowellYopp:feat/valkey-3-embedding-safety

Conversation

@MatthiasHowellYopp
Copy link
Copy Markdown
Contributor

@MatthiasHowellYopp MatthiasHowellYopp commented May 4, 2026

Description:

Part 3/4 of adding Valkey as a storage backend for CrewAI. This PR makes the embedding and memory persistence paths robust enough to work with async storage backends like Valkey.

What changed:

types.py — Added a field_validator on MemoryRecord.embedding that converts bytes to list[float] via numpy. Valkey stores vectors as raw bytes, so this ensures embeddings are always in the expected format regardless of storage backend. Also added a thread pool to embed_texts() so it doesn't block the event loop when called from an async context.

encoding_flow.py — Added a matching field_validator on ItemState.similar_records and result_record to handle the same bytes→float conversion during the consolidation flow.

unified_memory.py — drain_writes() now accepts a timeout_per_save parameter (default 60s) and logs warnings on timeout or failure instead of raising. This prevents a single slow or failed save from blocking crew completion. Added structured debug/warning/error logging throughout the drain cycle.

Testing:

test_embedding_safety.py (15 tests) — Covers bytes→float conversion, empty bytes, numpy arrays, int-to-float coercion, sync and async embed_texts behavior, empty/whitespace input handling.

Summary by CodeRabbit

  • Bug Fixes

    • More robust embedding normalization (bytes, numeric arrays, empty values handled consistently).
    • Embedding calls are safe in async contexts and preserve positions for skipped/empty inputs.
    • Background memory saves now use per-save timeouts with progress logging and no indefinite blocking (defaults to 60s).
  • Tests

    • Added comprehensive tests covering embedding validation, sync/async embedding behavior, and timeout/save handling.

Review Change Stack

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-3-embedding-safety branch 3 times, most recently from d425472 to c1a3805 Compare May 7, 2026 20:00
@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-3-embedding-safety branch from c1a3805 to 6df71c1 Compare May 11, 2026 19:53
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

The PR normalizes embeddings (including bytes and NumPy arrays), makes batch embedding calls safe when an asyncio loop is active by offloading to a thread pool with a 30s timeout, and replaces indefinite background-write blocking with per-save timeouts and logging. Tests covering embedding normalization and async-safe embedding behavior were added.

Changes

Embedding Normalization and Async-Safe Embedding

Layer / File(s) Summary
Embedding validation and normalization
lib/crewai/src/crewai/memory/types.py, lib/crewai/src/crewai/memory/encoding_flow.py
MemoryRecord.validate_embedding converts embedding inputs into list[float] | None, handling bytes (via NumPy) and empty bytes as None. ItemState field validator normalizes MemoryRecord.embedding entries in similar_records and result_record before parsing.
Async-safe embedding execution
lib/crewai/src/crewai/memory/types.py
Module-level _EMBED_POOL dispatches batch embeddings to a thread pool when an asyncio loop is running; embed_texts detects the loop and submits work with a 30s timeout, or calls the embedder directly when no loop is active. Empty strings are skipped and positions preserved.
Timeout-aware write draining
lib/crewai/src/crewai/memory/unified_memory.py
Memory.drain_writes(timeout_per_save=60.0) waits per-save with timeouts, logging progress and failures, and counts exceptions instead of blocking indefinitely or raising.
Tests for embedding safety
lib/crewai/tests/memory/test_embedding_safety.py
Tests verify embedding normalization (bytes/NumPy/int to float list), empty string handling, embed_texts sync/async behavior with position preservation, and NumPy result conversion.

🎯 3 (Moderate) | ⏱️ ~25 minutes

🐰 Embeddings now flow safe,
Async loops get thread relief,
Timeouts won't block long—
bytes become lists of light,
and writes drain with grace! 🌿

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 39.13% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly captures the main changes: async-safe embeddings and resilient drain_writes timeout handling, matching the PR's core objectives.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
lib/crewai/src/crewai/memory/unified_memory.py (1)

323-383: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

drain_writes timeout resilience is undermined by close() shutdown/close sequencing.

After a per-save timeout, writes can still be in-flight, but close() still does storage.close() and shutdown(wait=True). That can (1) race writers against a closed storage backend and (2) still block crew shutdown indefinitely.

Suggested direction (bounded shutdown path)
 def close(self) -> None:
     """Drain pending saves, flush storage, and shut down the background thread pool."""
     self.drain_writes()
-    if hasattr(self._storage, "close"):
-        self._storage.close()
-    self._save_pool.shutdown(wait=True)
+    with self._pending_lock:
+        has_inflight = any(not f.done() for f in self._pending_saves)
+
+    if has_inflight:
+        _logger.warning(
+            "[CLOSE] In-flight saves remain after drain timeout; "
+            "skipping blocking shutdown path to avoid hanging crew completion."
+        )
+        self._save_pool.shutdown(wait=False, cancel_futures=True)
+        return
+
+    self._save_pool.shutdown(wait=True, cancel_futures=True)
+    if hasattr(self._storage, "close"):
+        self._storage.close()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/memory/unified_memory.py` around lines 323 - 383, The
close() sequence currently races with in-flight background saves because it
calls storage.close() and then shuts down the pool unconditionally; change
close() to first call drain_writes(...) with a bounded total timeout (e.g.,
timeout_per_save * max(1, len(pending)) or a configurable overall timeout) to
allow background saves to finish, then stop blocking shutdown: after that
attempt a non-blocking shutdown of the save pool (use
_save_pool.shutdown(wait=False) or otherwise avoid indefinite blocking), attempt
to cancel any remaining futures in self._pending_saves, and only call
_storage.close() after pending futures were awaited/cancelled (or after the
non-blocking shutdown) to avoid racing writers against a closed storage backend;
refer to drain_writes, close, _save_pool, _storage, and self._pending_saves when
making the changes.
🧹 Nitpick comments (1)
lib/crewai/src/crewai/memory/unified_memory.py (1)

365-367: ⚡ Quick win

Include traceback on save failure logs for actionable diagnostics.

The error log only prints str(e). Add exc_info=True so failures in drain mode are debuggable.

Small logging improvement
-                _logger.error(
-                    "[DRAIN_WRITES] Save %d/%d failed: %s", i + 1, len(pending), e
-                )
+                _logger.error(
+                    "[DRAIN_WRITES] Save %d/%d failed: %s",
+                    i + 1,
+                    len(pending),
+                    e,
+                    exc_info=True,
+                )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/memory/unified_memory.py` around lines 365 - 367, The
error log in UnifiedMemory's drain routine uses _logger.error("[DRAIN_WRITES]
Save %d/%d failed: %s", i + 1, len(pending), e) which only prints e; update the
call in unified_memory.py (inside the drain/save loop in the UnifiedMemory
class/function) to pass exc_info=True to the logger so the full traceback is
included (i.e., call _logger.error(..., exc_info=True)) ensuring failures during
drain mode produce actionable stack traces.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/crewai/src/crewai/memory/encoding_flow.py`:
- Around line 71-94: The validator ensure_embedding_is_list should stop
re-implementing bytes→list[float] coercion and instead delegate to
MemoryRecord's canonical normalization (MemoryRecord.validate_embedding) for
both single records and lists; update ensure_embedding_is_list to iterate over
similar_records when it's a list and replace each record.embedding with the
result of MemoryRecord.validate_embedding (or call the shared normalization
utility) rather than mutating raw bytes in-place, and for the single-record path
call the same MemoryRecord normalization so empty bytes behavior (b"" → None)
remains consistent with MemoryRecord.

---

Outside diff comments:
In `@lib/crewai/src/crewai/memory/unified_memory.py`:
- Around line 323-383: The close() sequence currently races with in-flight
background saves because it calls storage.close() and then shuts down the pool
unconditionally; change close() to first call drain_writes(...) with a bounded
total timeout (e.g., timeout_per_save * max(1, len(pending)) or a configurable
overall timeout) to allow background saves to finish, then stop blocking
shutdown: after that attempt a non-blocking shutdown of the save pool (use
_save_pool.shutdown(wait=False) or otherwise avoid indefinite blocking), attempt
to cancel any remaining futures in self._pending_saves, and only call
_storage.close() after pending futures were awaited/cancelled (or after the
non-blocking shutdown) to avoid racing writers against a closed storage backend;
refer to drain_writes, close, _save_pool, _storage, and self._pending_saves when
making the changes.

---

Nitpick comments:
In `@lib/crewai/src/crewai/memory/unified_memory.py`:
- Around line 365-367: The error log in UnifiedMemory's drain routine uses
_logger.error("[DRAIN_WRITES] Save %d/%d failed: %s", i + 1, len(pending), e)
which only prints e; update the call in unified_memory.py (inside the drain/save
loop in the UnifiedMemory class/function) to pass exc_info=True to the logger so
the full traceback is included (i.e., call _logger.error(..., exc_info=True))
ensuring failures during drain mode produce actionable stack traces.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: e68de661-3cc4-4216-96e5-f6486226094e

📥 Commits

Reviewing files that changed from the base of the PR and between 63a9e7e and 6df71c1.

📒 Files selected for processing (4)
  • lib/crewai/src/crewai/memory/encoding_flow.py
  • lib/crewai/src/crewai/memory/types.py
  • lib/crewai/src/crewai/memory/unified_memory.py
  • lib/crewai/tests/memory/test_embedding_safety.py

Comment thread lib/crewai/src/crewai/memory/encoding_flow.py
Comment thread lib/crewai/src/crewai/memory/types.py Outdated
@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-3-embedding-safety branch from 6df71c1 to a4332f6 Compare May 11, 2026 20:37
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
lib/crewai/src/crewai/memory/types.py (1)

316-340: 💤 Low value

Consider applying the same async-context handling as embed_texts.

embed_texts offloads to the thread pool when called from an async context (lines 386-404), but embed_text calls the embedder directly. If embed_text is ever called from an async context, it will block the event loop.

Since the comment acknowledges this is deferred tech debt, consider at minimum adding embed_text as a wrapper that delegates to embed_texts for consistency:

♻️ Optional refactor
 def embed_text(embedder: Any, text: str) -> list[float]:
     ...
     if not text or not text.strip():
         return []
-
-    # Just call the embedder directly - the blocking issue needs to be fixed
-    # at a higher level (making Memory.recall() async)
-    result = embedder([text])
-
-    if not result:
-        return []
-    first = result[0]
-    if hasattr(first, "tolist"):
-        return list(first.tolist())
-    if isinstance(first, list):
-        return [float(x) for x in first]
-    return list(first)
+    results = embed_texts(embedder, [text])
+    return results[0] if results else []
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/memory/types.py` around lines 316 - 340, embed_text
currently calls the embedder directly and can block the event loop unlike
embed_texts which offloads to a thread pool; update embed_text to delegate to
embed_texts for consistent async-aware behavior by calling embed_texts(embedder,
[text]) (or its synchronous wrapper that runs embed_texts via the same
ThreadPoolExecutor path used in embed_texts), then extract the first embedding,
handle empty results, and convert types the same way as the current embed_text
logic (preserve checks for .tolist and list). Also update the docstring to note
that embed_text delegates to embed_texts to avoid blocking in async contexts.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@lib/crewai/src/crewai/memory/types.py`:
- Around line 316-340: embed_text currently calls the embedder directly and can
block the event loop unlike embed_texts which offloads to a thread pool; update
embed_text to delegate to embed_texts for consistent async-aware behavior by
calling embed_texts(embedder, [text]) (or its synchronous wrapper that runs
embed_texts via the same ThreadPoolExecutor path used in embed_texts), then
extract the first embedding, handle empty results, and convert types the same
way as the current embed_text logic (preserve checks for .tolist and list). Also
update the docstring to note that embed_text delegates to embed_texts to avoid
blocking in async contexts.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: df324f71-c9e3-457c-b693-918b702a38e6

📥 Commits

Reviewing files that changed from the base of the PR and between 6df71c1 and a4332f6.

📒 Files selected for processing (4)
  • lib/crewai/src/crewai/memory/encoding_flow.py
  • lib/crewai/src/crewai/memory/types.py
  • lib/crewai/src/crewai/memory/unified_memory.py
  • lib/crewai/tests/memory/test_embedding_safety.py
✅ Files skipped from review due to trivial changes (1)
  • lib/crewai/src/crewai/memory/unified_memory.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • lib/crewai/src/crewai/memory/encoding_flow.py
  • lib/crewai/tests/memory/test_embedding_safety.py

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-3-embedding-safety branch 3 times, most recently from 8b8a533 to bbbe31e Compare May 13, 2026 14:48
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
lib/crewai/src/crewai/memory/types.py (1)

343-360: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

This path still blocks the event loop.

Line 393 waits with Future.result(timeout=30) on the caller thread. If embed_texts() is reached from a coroutine, that caller thread is the event-loop thread, so the loop stays frozen until the embedder finishes or times out. Timed-out workers also keep running, so two hung calls can still exhaust _EMBED_POOL and turn later async reads into the same empty-embedding fallback. Please move the async case to an actual async helper (await loop.run_in_executor(...) / asyncio.to_thread(...) + asyncio.wait_for(...)) and keep this function sync-only.

In Python asyncio, if synchronous code running on the event-loop thread calls concurrent.futures.Future.result(timeout=30) on a ThreadPoolExecutor future, does that block the event loop until completion or timeout? If the timeout expires, does the worker thread continue running, and what is the recommended pattern for offloading blocking work from async code?

Also applies to: 384-399

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/memory/types.py` around lines 343 - 360, The current
embed_texts function offloads to _EMBED_POOL but blocks the event-loop by
calling Future.result(timeout=30) on the event-loop thread; instead make
embed_texts strictly synchronous and move the async-path into a new async helper
(e.g., async_embed_texts) that uses await loop.run_in_executor(...) or
asyncio.to_thread(...) combined with asyncio.wait_for(..., timeout=30) so the
event loop is not blocked and timed-out worker threads continue running without
freezing the loop; update callers so coroutines call async_embed_texts while
sync callers keep calling embed_texts, and reference _EMBED_POOL, embed_texts,
and the new async_embed_texts (or chosen helper name) when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@lib/crewai/src/crewai/memory/types.py`:
- Around line 343-360: The current embed_texts function offloads to _EMBED_POOL
but blocks the event-loop by calling Future.result(timeout=30) on the event-loop
thread; instead make embed_texts strictly synchronous and move the async-path
into a new async helper (e.g., async_embed_texts) that uses await
loop.run_in_executor(...) or asyncio.to_thread(...) combined with
asyncio.wait_for(..., timeout=30) so the event loop is not blocked and timed-out
worker threads continue running without freezing the loop; update callers so
coroutines call async_embed_texts while sync callers keep calling embed_texts,
and reference _EMBED_POOL, embed_texts, and the new async_embed_texts (or chosen
helper name) when making the change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 2eb45bad-c2d6-4985-9379-a115365a1142

📥 Commits

Reviewing files that changed from the base of the PR and between 8b8a533 and bbbe31e.

📒 Files selected for processing (4)
  • lib/crewai/src/crewai/memory/encoding_flow.py
  • lib/crewai/src/crewai/memory/types.py
  • lib/crewai/src/crewai/memory/unified_memory.py
  • lib/crewai/tests/memory/test_embedding_safety.py
🚧 Files skipped from review as they are similar to previous changes (3)
  • lib/crewai/src/crewai/memory/unified_memory.py
  • lib/crewai/src/crewai/memory/encoding_flow.py
  • lib/crewai/tests/memory/test_embedding_safety.py

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-3-embedding-safety branch 2 times, most recently from f9a7f08 to 2ddeaaf Compare May 19, 2026 18:17
@MatthiasHowellYopp
Copy link
Copy Markdown
Contributor Author

@greysonlalonde hoping I can get a review on this.

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-3-embedding-safety branch 8 times, most recently from d2af721 to e17a45d Compare May 26, 2026 13:58
@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-3-embedding-safety branch 2 times, most recently from 4575a77 to bb76f77 Compare May 28, 2026 20:20
Add bytes→float validators on MemoryRecord and ItemState to handle
Valkey returning embeddings as raw bytes. Make embed_texts() safe when
called from an async context by using a thread pool. Improve
drain_writes() with per-save timeouts and error logging instead of
raising on failure.

Part 3/4 of Valkey storage implementation.
@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-3-embedding-safety branch from bb76f77 to c5dcef6 Compare June 1, 2026 13:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant