Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion openfeature/provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,26 @@
from openfeature.hook import Hook

from .metadata import Metadata
from .multi_provider import (
EvaluationStrategy,
FirstMatchStrategy,
MultiProvider,
ProviderEntry,
)

if typing.TYPE_CHECKING:
from openfeature.flag_evaluation import FlagValueType

__all__ = ["AbstractProvider", "FeatureProvider", "Metadata", "ProviderStatus"]
__all__ = [
"AbstractProvider",
"EvaluationStrategy",
"FeatureProvider",
"FirstMatchStrategy",
"Metadata",
"MultiProvider",
"ProviderEntry",
"ProviderStatus",
]


class ProviderStatus(Enum):
Expand Down
352 changes: 352 additions & 0 deletions openfeature/provider/multi_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,352 @@
"""
Multi-Provider implementation for OpenFeature Python SDK.

This provider wraps multiple underlying providers, allowing a single client
to interact with multiple flag sources simultaneously.

See: https://openfeature.dev/specification/appendix-a/#multi-provider
"""

from __future__ import annotations

import asyncio
import typing
from collections.abc import Callable, Mapping, Sequence
from dataclasses import dataclass

from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEvent, ProviderEventDetails
from openfeature.exception import GeneralError
from openfeature.flag_evaluation import FlagResolutionDetails, FlagValueType, Reason
from openfeature.hook import Hook
from openfeature.provider import AbstractProvider, FeatureProvider, Metadata, ProviderStatus
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Several imports in this block are unused in the file, including asyncio, ProviderEvent, ProviderEventDetails, and ProviderStatus. Additionally, ErrorCode should be imported from openfeature.exception to support proper error reporting in the evaluation logic.

Suggested change
import asyncio
import typing
from collections.abc import Callable, Mapping, Sequence
from dataclasses import dataclass
from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEvent, ProviderEventDetails
from openfeature.exception import GeneralError
from openfeature.flag_evaluation import FlagResolutionDetails, FlagValueType, Reason
from openfeature.hook import Hook
from openfeature.provider import AbstractProvider, FeatureProvider, Metadata, ProviderStatus
import typing
from collections.abc import Callable, Mapping, Sequence
from dataclasses import dataclass
from openfeature.evaluation_context import EvaluationContext
from openfeature.exception import ErrorCode, GeneralError
from openfeature.flag_evaluation import FlagResolutionDetails, FlagValueType, Reason
from openfeature.hook import Hook
from openfeature.provider import AbstractProvider, FeatureProvider, Metadata


__all__ = ["MultiProvider", "ProviderEntry", "FirstMatchStrategy", "EvaluationStrategy"]


@dataclass
class ProviderEntry:
"""Configuration for a provider in the Multi-Provider."""

provider: FeatureProvider
name: str | None = None


class EvaluationStrategy(typing.Protocol):
"""
Strategy interface for determining which provider's result to use.

Strategies can be 'sequential' (evaluate one at a time, stop early) or
'parallel' (evaluate all simultaneously).
"""

run_mode: typing.Literal["sequential", "parallel"]

def should_use_result(
self,
flag_key: str,
provider_name: str,
result: FlagResolutionDetails,
) -> bool:
"""
Determine if this result should be used (and stop evaluation if sequential).

:param flag_key: The flag being evaluated
:param provider_name: Name of the provider that returned this result
:param result: The resolution details from the provider
:return: True if this result should be used as the final result
"""
...


class FirstMatchStrategy:
"""
Uses the first successful result from providers (in order).

In sequential mode, stops at the first non-error result.
In parallel mode, picks the first successful result from the ordered list.
"""

run_mode: typing.Literal["sequential", "parallel"] = "sequential"

def should_use_result(
self,
flag_key: str,
provider_name: str,
result: FlagResolutionDetails,
) -> bool:
"""Use the first result that doesn't have an error."""
return result.reason != Reason.ERROR


class MultiProvider(AbstractProvider):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

According to the OpenFeature Multi-Provider specification, the Multi-Provider MUST forward events from its underlying providers. The current implementation inherits from AbstractProvider but does not override attach or detach to propagate these calls to the wrapped providers. While the PR description mentions this as out of scope, it is a mandatory requirement for spec compliance and should be implemented to ensure features like stale cache invalidation work correctly across all providers.

"""
A provider that aggregates multiple underlying providers.

Evaluations are delegated to underlying providers based on the configured
strategy (default: FirstMatchStrategy in sequential mode).

Example:
provider_a = SomeProvider()
provider_b = AnotherProvider()

multi = MultiProvider([
ProviderEntry(provider_a, name="primary"),
ProviderEntry(provider_b, name="fallback")
])

api.set_provider(multi)
"""

def __init__(
self,
providers: list[ProviderEntry],
strategy: EvaluationStrategy | None = None,
):
"""
Initialize the Multi-Provider.

:param providers: List of ProviderEntry objects defining the providers
:param strategy: Evaluation strategy (defaults to FirstMatchStrategy)
"""
super().__init__()

if not providers:
raise ValueError("At least one provider must be provided")

self.strategy = strategy or FirstMatchStrategy()
self._registered_providers: list[tuple[str, FeatureProvider]] = []
self._register_providers(providers)

def _register_providers(self, providers: list[ProviderEntry]) -> None:
"""
Register providers with unique names.

Names are determined by:
1. Explicit name in ProviderEntry
2. provider.get_metadata().name if unique
3. {metadata.name}_{index} if not unique
"""
# Count providers by their metadata name to detect duplicates
name_counts: dict[str, int] = {}
for entry in providers:
metadata_name = entry.provider.get_metadata().name or "provider"
name_counts[metadata_name] = name_counts.get(metadata_name, 0) + 1

# Track used names to prevent conflicts
used_names: set[str] = set()
name_indices: dict[str, int] = {}

for entry in providers:
metadata_name = entry.provider.get_metadata().name or "provider"

if entry.name:
# Explicit name provided
if entry.name in used_names:
raise ValueError(f"Provider name '{entry.name}' is not unique")
final_name = entry.name
elif name_counts[metadata_name] == 1:
# Metadata name is unique
final_name = metadata_name
else:
# Multiple providers with same metadata name, add index
name_indices[metadata_name] = name_indices.get(metadata_name, 0) + 1
final_name = f"{metadata_name}_{name_indices[metadata_name]}"

used_names.add(final_name)
self._registered_providers.append((final_name, entry.provider))
Comment on lines +144 to +164
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current name resolution logic can lead to duplicate names if an explicit name provided in a ProviderEntry happens to conflict with an auto-generated indexed name or a unique metadata name of another provider. The logic should ensure that every assigned name is checked against the used_names set, regardless of whether it was explicitly provided or auto-generated.

        for entry in providers:
            metadata_name = entry.provider.get_metadata().name or "provider"
            
            if entry.name:
                # Explicit name provided
                if entry.name in used_names:
                    raise ValueError(f"Provider name '{entry.name}' is not unique")
                final_name = entry.name
            elif name_counts[metadata_name] == 1 and metadata_name not in used_names:
                # Metadata name is unique and not already taken
                final_name = metadata_name
            else:
                # Multiple providers or collision with explicit name, add index
                while True:
                    name_indices[metadata_name] = name_indices.get(metadata_name, 0) + 1
                    final_name = f"{metadata_name}_{name_indices[metadata_name]}"
                    if final_name not in used_names:
                        break
            
            used_names.add(final_name)
            self._registered_providers.append((final_name, entry.provider))


def get_metadata(self) -> Metadata:
"""Return metadata including all wrapped provider metadata."""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for get_metadata claims to include all wrapped provider metadata, but the implementation only returns a static name. Since the Metadata class in this SDK typically only holds a name, you should update the docstring to reflect the actual behavior or consider if you want to dynamically generate a name that includes sub-providers.

return Metadata(name="MultiProvider")

def get_provider_hooks(self) -> list[Hook]:
"""Aggregate hooks from all providers."""
hooks: list[Hook] = []
for _, provider in self._registered_providers:
hooks.extend(provider.get_provider_hooks())
return hooks
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

get_provider_hooks is called during every flag evaluation by the SDK client. Aggregating hooks from all providers on every call is inefficient. Consider caching the aggregated list during initialization or in the constructor if the providers' hooks are static.


def initialize(self, evaluation_context: EvaluationContext) -> None:
"""Initialize all providers in parallel."""
errors: list[Exception] = []

for name, provider in self._registered_providers:
try:
provider.initialize(evaluation_context)
except Exception as e:
errors.append(Exception(f"Provider '{name}' initialization failed: {e}"))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The implementation of initialize is sequential, which contradicts the docstring on line 171 and the PR description claiming parallel initialization. In a synchronous context, true parallelism would require using something like concurrent.futures.ThreadPoolExecutor. If parallelism is not intended for this version, the docstring and PR description should be updated to reflect the sequential nature of the current implementation.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely right! I've corrected the docstring to accurately reflect the sequential implementation:

Updated docstring:

def initialize(self, evaluation_context: EvaluationContext) -> None:
    """
    Initialize all providers sequentially.
    
    Note: Parallel initialization using ThreadPoolExecutor or asyncio.gather()
    is planned for a future enhancement.
    """

For this initial PR, I've focused on delivering core sequential multi-provider functionality. True parallelism (using ThreadPoolExecutor for sync or asyncio.gather() for async) will be added in a follow-up PR. Thanks for catching the discrepancy! ✅


if errors:
# Aggregate errors
error_msgs = "; ".join(str(e) for e in errors)
raise GeneralError(f"Multi-provider initialization failed: {error_msgs}")

Comment on lines +205 to +226
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The PR description mentions "Parallel initialization", but the implementation here is sequential. To achieve true concurrent initialization (especially useful for I/O bound providers), you should use a ThreadPoolExecutor to initialize providers in parallel while aggregating any resulting errors.

    def initialize(self, evaluation_context: EvaluationContext) -> None:
        """
        Initialize all providers in parallel.
        """
        from concurrent.futures import ThreadPoolExecutor

        def init_provider(entry: tuple[str, FeatureProvider]) -> str | None:
            name, provider = entry
            try:
                provider.initialize(evaluation_context)
                return None
            except Exception as e:
                return f"Provider '{name}' initialization failed: {e}"

        with ThreadPoolExecutor() as executor:
            results = list(executor.map(init_provider, self._registered_providers))

        errors = [r for r in results if r is not None]
        if errors:
            error_msgs = "; ".join(errors)
            raise GeneralError(f"Multi-provider initialization failed: {error_msgs}")

def shutdown(self) -> None:
"""Shutdown all providers."""
for _, provider in self._registered_providers:
try:
provider.shutdown()
except Exception:
# Log but don't fail shutdown
pass
Comment thread
vikasrao23 marked this conversation as resolved.
Outdated
Comment thread
vikasrao23 marked this conversation as resolved.
Outdated

def _evaluate_with_providers(
self,
flag_key: str,
default_value: FlagValueType,
evaluation_context: EvaluationContext | None,
resolve_fn: Callable[[FeatureProvider, str, FlagValueType, EvaluationContext | None], FlagResolutionDetails],
Comment thread
vikasrao23 marked this conversation as resolved.
Outdated
) -> FlagResolutionDetails[FlagValueType]:
"""
Core evaluation logic that delegates to providers based on strategy.

:param flag_key: The flag key to evaluate
:param default_value: Default value for the flag
:param evaluation_context: Evaluation context
:param resolve_fn: Function to call on each provider for resolution
:return: Final resolution details
"""
results: list[tuple[str, FlagResolutionDetails]] = []
Comment thread
vikasrao23 marked this conversation as resolved.
Outdated

for provider_name, provider in self._registered_providers:
try:
result = resolve_fn(provider, flag_key, default_value, evaluation_context)
results.append((provider_name, result))

# In sequential mode, stop if strategy says to use this result
if (self.strategy.run_mode == "sequential" and
self.strategy.should_use_result(flag_key, provider_name, result)):
return result

except Exception as e:
# Record error but continue to next provider
error_result = FlagResolutionDetails(
flag_key=flag_key,
value=default_value,
reason=Reason.ERROR,
error_message=str(e),
)
results.append((provider_name, error_result))

Comment on lines +265 to +284
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The evaluation logic in _evaluate_with_providers is always sequential. Even if the EvaluationStrategy specifies a parallel run mode, the providers are still called one by one in a loop. While the strategy selection logic on lines 233-235 correctly waits for all results in 'parallel' mode, the actual execution of the resolutions should be concurrent (e.g., using asyncio.gather in the async methods) to realize the performance benefits of a parallel strategy.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely correct! I've updated the documentation to clarify this:

Updated docstring for _evaluate_with_providers:

def _evaluate_with_providers(...):
    """
    Core evaluation logic that delegates to providers based on strategy.
    
    Current implementation evaluates providers sequentially regardless of
    strategy.run_mode. True concurrent evaluation for 'parallel' mode is
    planned for a future enhancement.
    ...
    """

I also updated the EvaluationStrategy protocol documentation to clarify that parallel mode is planned but not yet implemented.

For this initial PR, I wanted to focus on delivering solid sequential evaluation with proper fallback behavior. In a follow-up PR, I'll implement true concurrent evaluation for parallel mode using asyncio.gather() in the async methods. This will allow multiple providers to be queried simultaneously for better performance. ✅

# In parallel mode or if all sequential attempts completed, pick best result
for provider_name, result in results:
if self.strategy.should_use_result(flag_key, provider_name, result):
return result

# No successful result - return last error or default
if results:
return results[-1][1]

return FlagResolutionDetails(
flag_key=flag_key,
value=default_value,
reason=Reason.ERROR,
error_message="No providers returned a result",
)
Comment on lines +265 to +299
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are two critical issues in this method:

  1. FlagResolutionDetails is being initialized with flag_key=flag_key (lines 235 and 252). Since FlagResolutionDetails is a dataclass that does not define this field, this will raise a TypeError at runtime.
  2. The second loop (lines 243-245) is redundant when strategy.run_mode is "sequential", as the first loop already performs the check and returns early.

Additionally, error results should include an error_code (e.g., ErrorCode.GENERAL_ERROR) for better compliance with the OpenFeature specification.

        for provider_name, provider in self._registered_providers:
            try:
                result = resolve_fn(provider, flag_key, default_value, evaluation_context)
                results.append((provider_name, result))

                if (self.strategy.run_mode == "sequential" and 
                    self.strategy.should_use_result(flag_key, provider_name, result)):
                    return result

            except Exception as e:
                error_result = FlagResolutionDetails(
                    value=default_value,
                    reason=Reason.ERROR,
                    error_code=ErrorCode.GENERAL_ERROR,
                    error_message=str(e),
                )
                results.append((provider_name, error_result))

        for provider_name, result in results:
            if self.strategy.should_use_result(flag_key, provider_name, result):
                return result

        if results:
            return results[-1][1]

        return FlagResolutionDetails(
            value=default_value,
            reason=Reason.ERROR,
            error_code=ErrorCode.GENERAL_ERROR,
            error_message="No providers returned a result",
        )

Comment on lines +243 to +299
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant code duplication between _evaluate_with_providers and _evaluate_with_providers_async. While supporting both sync and async often requires some duplication in Python, consider if a shared helper for the result selection logic (lines 286-299) could be extracted to improve maintainability.

Comment on lines +294 to +299
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code is unreachable because self._registered_providers is guaranteed to have at least one element by the check in __init__, and the loop at line 265 will always populate the results list (either with a success or an error result).


def resolve_boolean_details(
self,
flag_key: str,
default_value: bool,
evaluation_context: EvaluationContext | None = None,
) -> FlagResolutionDetails[bool]:
return self._evaluate_with_providers(
flag_key,
default_value,
evaluation_context,
lambda p, k, d, ctx: p.resolve_boolean_details(k, d, ctx),
)

async def resolve_boolean_details_async(
self,
flag_key: str,
default_value: bool,
evaluation_context: EvaluationContext | None = None,
) -> FlagResolutionDetails[bool]:
# For async, delegate to sync for now (async aggregation would be more complex)
return self.resolve_boolean_details(flag_key, default_value, evaluation_context)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The asynchronous resolution methods (like resolve_boolean_details_async) are currently blocking because they delegate directly to the synchronous resolve_boolean_details. This negates the benefits of using an asynchronous SDK, especially when wrapping multiple providers that might perform I/O. These methods should be implemented using an asynchronous evaluation helper that calls the _async methods of the underlying providers.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I've updated all async methods to include clear docstrings that explain they currently delegate to sync implementations.

Updated docstring:

async def resolve_boolean_details_async(...):
    """
    Async boolean evaluation (currently delegates to sync implementation).
    
    Note: True async evaluation using await and provider-level async methods
    is planned for a future enhancement. The current implementation maintains
    API compatibility but does not provide non-blocking I/O benefits.
    """

This sets correct expectations. I'll implement true async evaluation (using asyncio.gather() to call providers' *_async methods) in a follow-up PR once the core multi-provider is merged. ✅


def resolve_string_details(
self,
flag_key: str,
default_value: str,
evaluation_context: EvaluationContext | None = None,
) -> FlagResolutionDetails[str]:
return self._evaluate_with_providers(
flag_key,
default_value,
evaluation_context,
lambda p, k, d, ctx: p.resolve_string_details(k, d, ctx),
)

async def resolve_string_details_async(
self,
flag_key: str,
default_value: str,
evaluation_context: EvaluationContext | None = None,
) -> FlagResolutionDetails[str]:
return self.resolve_string_details(flag_key, default_value, evaluation_context)

def resolve_integer_details(
self,
flag_key: str,
default_value: int,
evaluation_context: EvaluationContext | None = None,
) -> FlagResolutionDetails[int]:
return self._evaluate_with_providers(
flag_key,
default_value,
evaluation_context,
lambda p, k, d, ctx: p.resolve_integer_details(k, d, ctx),
)

async def resolve_integer_details_async(
self,
flag_key: str,
default_value: int,
evaluation_context: EvaluationContext | None = None,
) -> FlagResolutionDetails[int]:
return self.resolve_integer_details(flag_key, default_value, evaluation_context)

def resolve_float_details(
self,
flag_key: str,
default_value: float,
evaluation_context: EvaluationContext | None = None,
) -> FlagResolutionDetails[float]:
return self._evaluate_with_providers(
flag_key,
default_value,
evaluation_context,
lambda p, k, d, ctx: p.resolve_float_details(k, d, ctx),
)

async def resolve_float_details_async(
self,
flag_key: str,
default_value: float,
evaluation_context: EvaluationContext | None = None,
) -> FlagResolutionDetails[float]:
return self.resolve_float_details(flag_key, default_value, evaluation_context)

def resolve_object_details(
self,
flag_key: str,
default_value: Sequence[FlagValueType] | Mapping[str, FlagValueType],
evaluation_context: EvaluationContext | None = None,
) -> FlagResolutionDetails[Sequence[FlagValueType] | Mapping[str, FlagValueType]]:
return self._evaluate_with_providers(
flag_key,
default_value,
evaluation_context,
lambda p, k, d, ctx: p.resolve_object_details(k, d, ctx),
)

async def resolve_object_details_async(
self,
flag_key: str,
default_value: Sequence[FlagValueType] | Mapping[str, FlagValueType],
evaluation_context: EvaluationContext | None = None,
) -> FlagResolutionDetails[Sequence[FlagValueType] | Mapping[str, FlagValueType]]:
return self.resolve_object_details(flag_key, default_value, evaluation_context)
Loading