Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 63 additions & 17 deletions python/semantic_kernel/core_plugins/http_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,85 @@ class HttpPlugin(KernelBaseModel):
"""A plugin that provides HTTP functionality.

Usage:
kernel.add_plugin(HttpPlugin(), "http")

# With allowed domains for security:
# With allowed domains (recommended):
kernel.add_plugin(HttpPlugin(allowed_domains=["example.com", "api.example.com"]), "http")

# Explicitly allow all domains (opt-in, less secure):
kernel.add_plugin(HttpPlugin(allow_all_domains=True), "http")

Examples:
{{http.getAsync $url}}
{{http.postAsync $url}}
{{http.putAsync $url}}
{{http.deleteAsync $url}}

Security:
- By default, all requests are blocked unless ``allowed_domains`` is provided
or ``allow_all_domains`` is set to True.
- When ``allowed_domains`` is set and ``allow_all_domains`` is False, HTTP
redirects are disabled to prevent redirect-based domain bypass (SSRF).
- When ``allow_all_domains`` is True, redirects are allowed regardless of
whether ``allowed_domains`` is also set.
- Only ``http`` and ``https`` URL schemes are permitted.
"""

allowed_domains: set[str] | None = None
"""List of allowed domains to send requests to. If None, all domains are allowed."""
"""Set of allowed domains to send requests to."""

allow_all_domains: bool = False
"""When True, requests to any domain are allowed. Must be explicitly set."""

_ALLOWED_SCHEMES: frozenset[str] = frozenset({"http", "https"})

@property
def _allow_redirects(self) -> bool:
"""Whether HTTP redirects should be followed.

Redirects are only allowed when ``allow_all_domains`` is True.
When domain restrictions are configured, redirects are disabled
to prevent redirect-based SSRF bypass.
"""
return self.allow_all_domains

def _is_uri_allowed(self, url: str) -> bool:
"""Check if the URL's host is in the allowed domains list.
"""Check if the URL's host and scheme are permitted.

Args:
url: The URL to check.

Returns:
True if the URL is allowed, False otherwise.
"""
if self.allowed_domains is None:
return True

parsed = urlparse(url)

# Validate scheme
if parsed.scheme.lower() not in self._ALLOWED_SCHEMES:
return False

host = parsed.hostname
if host is None:
if not host:
return False

# Case-insensitive comparison
return host.lower() in {domain.lower() for domain in self.allowed_domains}
# If allow_all_domains is set, skip domain check
if self.allow_all_domains:
return True

# If allowed_domains is set, check against it
if self.allowed_domains is not None:
return host.lower() in {domain.lower() for domain in self.allowed_domains}

# Default: deny all
return False

def _validate_url(self, url: str) -> None:
"""Validate the URL, checking if it's not empty and is in the allowed domains.
"""Validate the URL, checking scheme, emptiness, and allowed domains.

Args:
url: The URL to validate.

Raises:
FunctionExecutionException: If the URL is empty or not in the allowed domains.
FunctionExecutionException: If the URL is empty, uses a disallowed scheme,
or targets a domain that is not allowed.
"""
if not url:
raise FunctionExecutionException("url cannot be `None` or empty")
Expand All @@ -77,7 +113,10 @@ async def get(self, url: Annotated[str, "The URL to send the request to."]) -> s
"""
self._validate_url(url)

async with aiohttp.ClientSession() as session, session.get(url, raise_for_status=True) as response:
async with (
aiohttp.ClientSession() as session,
session.get(url, raise_for_status=True, allow_redirects=self._allow_redirects) as response,
):
return await response.text()

@kernel_function(description="Makes a POST request to a uri", name="postAsync")
Expand All @@ -100,7 +139,9 @@ async def post(
data = json.dumps(body) if body is not None else None
async with (
aiohttp.ClientSession() as session,
session.post(url, headers=headers, data=data, raise_for_status=True) as response,
session.post(
url, headers=headers, data=data, raise_for_status=True, allow_redirects=self._allow_redirects
) as response,
):
return await response.text()

Expand All @@ -125,7 +166,9 @@ async def put(
data = json.dumps(body) if body is not None else None
async with (
aiohttp.ClientSession() as session,
session.put(url, headers=headers, data=data, raise_for_status=True) as response,
session.put(
url, headers=headers, data=data, raise_for_status=True, allow_redirects=self._allow_redirects
) as response,
):
return await response.text()

Expand All @@ -141,5 +184,8 @@ async def delete(self, url: Annotated[str, "The URI to send the request to."]) -
"""
self._validate_url(url)

async with aiohttp.ClientSession() as session, session.delete(url, raise_for_status=True) as response:
async with (
aiohttp.ClientSession() as session,
session.delete(url, raise_for_status=True, allow_redirects=self._allow_redirects) as response,
):
return await response.text()
Loading
Loading