diff --git a/python/semantic_kernel/core_plugins/http_plugin.py b/python/semantic_kernel/core_plugins/http_plugin.py index 40269a5c49c8..8a554410fb44 100644 --- a/python/semantic_kernel/core_plugins/http_plugin.py +++ b/python/semantic_kernel/core_plugins/http_plugin.py @@ -15,23 +15,48 @@ class HttpPlugin(KernelBaseModel): """A plugin that provides HTTP functionality. Usage: - kernel.add_plugin(HttpPlugin(), "http") - - # With allowed domains for security: + # With allowed domains (recommended): kernel.add_plugin(HttpPlugin(allowed_domains=["example.com", "api.example.com"]), "http") + # Explicitly allow all domains (opt-in, less secure): + kernel.add_plugin(HttpPlugin(allow_all_domains=True), "http") + Examples: {{http.getAsync $url}} {{http.postAsync $url}} {{http.putAsync $url}} {{http.deleteAsync $url}} + + Security: + - By default, all requests are blocked unless ``allowed_domains`` is provided + or ``allow_all_domains`` is set to True. + - When ``allowed_domains`` is set and ``allow_all_domains`` is False, HTTP + redirects are disabled to prevent redirect-based domain bypass (SSRF). + - When ``allow_all_domains`` is True, redirects are allowed regardless of + whether ``allowed_domains`` is also set. + - Only ``http`` and ``https`` URL schemes are permitted. """ allowed_domains: set[str] | None = None - """List of allowed domains to send requests to. If None, all domains are allowed.""" + """Set of allowed domains to send requests to.""" + + allow_all_domains: bool = False + """When True, requests to any domain are allowed. Must be explicitly set.""" + + _ALLOWED_SCHEMES: frozenset[str] = frozenset({"http", "https"}) + + @property + def _allow_redirects(self) -> bool: + """Whether HTTP redirects should be followed. + + Redirects are only allowed when ``allow_all_domains`` is True. + When domain restrictions are configured, redirects are disabled + to prevent redirect-based SSRF bypass. + """ + return self.allow_all_domains def _is_uri_allowed(self, url: str) -> bool: - """Check if the URL's host is in the allowed domains list. + """Check if the URL's host and scheme are permitted. Args: url: The URL to check. @@ -39,25 +64,36 @@ def _is_uri_allowed(self, url: str) -> bool: Returns: True if the URL is allowed, False otherwise. """ - if self.allowed_domains is None: - return True - parsed = urlparse(url) + + # Validate scheme + if parsed.scheme.lower() not in self._ALLOWED_SCHEMES: + return False + host = parsed.hostname - if host is None: + if not host: return False - # Case-insensitive comparison - return host.lower() in {domain.lower() for domain in self.allowed_domains} + # If allow_all_domains is set, skip domain check + if self.allow_all_domains: + return True + + # If allowed_domains is set, check against it + if self.allowed_domains is not None: + return host.lower() in {domain.lower() for domain in self.allowed_domains} + + # Default: deny all + return False def _validate_url(self, url: str) -> None: - """Validate the URL, checking if it's not empty and is in the allowed domains. + """Validate the URL, checking scheme, emptiness, and allowed domains. Args: url: The URL to validate. Raises: - FunctionExecutionException: If the URL is empty or not in the allowed domains. + FunctionExecutionException: If the URL is empty, uses a disallowed scheme, + or targets a domain that is not allowed. """ if not url: raise FunctionExecutionException("url cannot be `None` or empty") @@ -77,7 +113,10 @@ async def get(self, url: Annotated[str, "The URL to send the request to."]) -> s """ self._validate_url(url) - async with aiohttp.ClientSession() as session, session.get(url, raise_for_status=True) as response: + async with ( + aiohttp.ClientSession() as session, + session.get(url, raise_for_status=True, allow_redirects=self._allow_redirects) as response, + ): return await response.text() @kernel_function(description="Makes a POST request to a uri", name="postAsync") @@ -100,7 +139,9 @@ async def post( data = json.dumps(body) if body is not None else None async with ( aiohttp.ClientSession() as session, - session.post(url, headers=headers, data=data, raise_for_status=True) as response, + session.post( + url, headers=headers, data=data, raise_for_status=True, allow_redirects=self._allow_redirects + ) as response, ): return await response.text() @@ -125,7 +166,9 @@ async def put( data = json.dumps(body) if body is not None else None async with ( aiohttp.ClientSession() as session, - session.put(url, headers=headers, data=data, raise_for_status=True) as response, + session.put( + url, headers=headers, data=data, raise_for_status=True, allow_redirects=self._allow_redirects + ) as response, ): return await response.text() @@ -141,5 +184,8 @@ async def delete(self, url: Annotated[str, "The URI to send the request to."]) - """ self._validate_url(url) - async with aiohttp.ClientSession() as session, session.delete(url, raise_for_status=True) as response: + async with ( + aiohttp.ClientSession() as session, + session.delete(url, raise_for_status=True, allow_redirects=self._allow_redirects) as response, + ): return await response.text() diff --git a/python/tests/unit/core_plugins/test_http_plugin.py b/python/tests/unit/core_plugins/test_http_plugin.py index 4472211c76c7..216967ffaee9 100644 --- a/python/tests/unit/core_plugins/test_http_plugin.py +++ b/python/tests/unit/core_plugins/test_http_plugin.py @@ -11,7 +11,7 @@ async def test_it_can_be_instantiated(): - plugin = HttpPlugin() + plugin = HttpPlugin(allow_all_domains=True) assert plugin is not None @@ -23,7 +23,7 @@ async def test_it_can_be_instantiated_with_allowed_domains(): async def test_it_can_be_imported(): kernel = Kernel() - plugin = HttpPlugin() + plugin = HttpPlugin(allow_all_domains=True) kernel.add_plugin(plugin, "http") assert kernel.get_plugin(plugin_name="http") is not None assert kernel.get_plugin(plugin_name="http").name == "http" @@ -36,20 +36,20 @@ async def test_get(mock_get): mock_get.return_value.__aenter__.return_value.text.return_value = "Hello" mock_get.return_value.__aenter__.return_value.status = 200 - plugin = HttpPlugin() + plugin = HttpPlugin(allow_all_domains=True) response = await plugin.get("https://example.org/get") assert response == "Hello" @pytest.mark.parametrize("method", ["get", "post", "put", "delete"]) async def test_fail_no_url(method): - plugin = HttpPlugin() + plugin = HttpPlugin(allow_all_domains=True) with pytest.raises(FunctionExecutionException): await getattr(plugin, method)(url="") async def test_get_none_url(): - plugin = HttpPlugin() + plugin = HttpPlugin(allow_all_domains=True) with pytest.raises(FunctionExecutionException): await plugin.get(None) @@ -59,7 +59,7 @@ async def test_post(mock_post): mock_post.return_value.__aenter__.return_value.text.return_value = "Hello World !" mock_post.return_value.__aenter__.return_value.status = 200 - plugin = HttpPlugin() + plugin = HttpPlugin(allow_all_domains=True) arguments = KernelArguments(url="https://example.org/post", body="{message: 'Hello, world!'}") response = await plugin.post(**arguments) assert response == "Hello World !" @@ -70,7 +70,7 @@ async def test_post_nobody(mock_post): mock_post.return_value.__aenter__.return_value.text.return_value = "Hello World !" mock_post.return_value.__aenter__.return_value.status = 200 - plugin = HttpPlugin() + plugin = HttpPlugin(allow_all_domains=True) arguments = KernelArguments(url="https://example.org/post") response = await plugin.post(**arguments) assert response == "Hello World !" @@ -81,7 +81,7 @@ async def test_put(mock_put): mock_put.return_value.__aenter__.return_value.text.return_value = "Hello World !" mock_put.return_value.__aenter__.return_value.status = 200 - plugin = HttpPlugin() + plugin = HttpPlugin(allow_all_domains=True) arguments = KernelArguments(url="https://example.org/put", body="{message: 'Hello, world!'}") response = await plugin.put(**arguments) assert response == "Hello World !" @@ -92,7 +92,7 @@ async def test_put_nobody(mock_put): mock_put.return_value.__aenter__.return_value.text.return_value = "Hello World !" mock_put.return_value.__aenter__.return_value.status = 200 - plugin = HttpPlugin() + plugin = HttpPlugin(allow_all_domains=True) arguments = KernelArguments(url="https://example.org/put") response = await plugin.put(**arguments) assert response == "Hello World !" @@ -103,7 +103,7 @@ async def test_delete(mock_delete): mock_delete.return_value.__aenter__.return_value.text.return_value = "Hello World !" mock_delete.return_value.__aenter__.return_value.status = 200 - plugin = HttpPlugin() + plugin = HttpPlugin(allow_all_domains=True) arguments = KernelArguments(url="https://example.org/delete") response = await plugin.delete(**arguments) assert response == "Hello World !" @@ -178,9 +178,9 @@ async def test_allowed_domains_case_insensitive(): assert plugin._is_uri_allowed("https://Example.Com/path") is True -async def test_allowed_domains_none_allows_all(): - """Test that when allowed_domains is None, all domains are allowed.""" - plugin = HttpPlugin() # allowed_domains defaults to None +async def test_allow_all_domains_allows_all(): + """Test that when allow_all_domains is True, all domains are allowed.""" + plugin = HttpPlugin(allow_all_domains=True) assert plugin._is_uri_allowed("https://any-domain.com/path") is True assert plugin._is_uri_allowed("https://another-domain.org/path") is True @@ -214,3 +214,178 @@ async def test_allowed_domains_exact_subdomain_match(): assert plugin._is_uri_allowed("https://sub.example.com/path") is True assert plugin._is_uri_allowed("https://example.com/path") is False assert plugin._is_uri_allowed("https://other.example.com/path") is False + + +# Security regression tests + + +async def test_default_constructor_denies_all(): + """Test that default HttpPlugin() denies all requests (issue 115285).""" + plugin = HttpPlugin() + assert plugin._is_uri_allowed("https://example.com/path") is False + assert plugin._is_uri_allowed("https://any-domain.com/path") is False + + +@pytest.mark.parametrize("method", ["get", "post", "put", "delete"]) +async def test_default_constructor_blocks_requests(method): + """Test that default HttpPlugin() blocks all HTTP methods (issue 115285).""" + plugin = HttpPlugin() + with pytest.raises(FunctionExecutionException, match="Sending requests to the provided location is not allowed"): + if method in ["post", "put"]: + await getattr(plugin, method)(url="https://example.com/path", body={"key": "value"}) + else: + await getattr(plugin, method)(url="https://example.com/path") + + +@patch("aiohttp.ClientSession.get") +async def test_allow_all_domains_flag(mock_get): + """Test that allow_all_domains=True permits requests to any domain.""" + mock_get.return_value.__aenter__.return_value.text.return_value = "OK" + mock_get.return_value.__aenter__.return_value.status = 200 + + plugin = HttpPlugin(allow_all_domains=True) + response = await plugin.get("https://any-domain.com/path") + assert response == "OK" + + +@patch("aiohttp.ClientSession.get") +async def test_redirects_disabled_with_allowed_domains(mock_get): + """Test that redirects are disabled when allowed_domains is set (issue 115048).""" + mock_get.return_value.__aenter__.return_value.text.return_value = "OK" + mock_get.return_value.__aenter__.return_value.status = 200 + + plugin = HttpPlugin(allowed_domains={"example.com"}) + await plugin.get("https://example.com/path") + + _, kwargs = mock_get.call_args + assert kwargs["allow_redirects"] is False + + +@patch("aiohttp.ClientSession.post") +async def test_redirects_disabled_for_post_with_allowed_domains(mock_post): + """Test that redirects are disabled for POST when allowed_domains is set.""" + mock_post.return_value.__aenter__.return_value.text.return_value = "OK" + mock_post.return_value.__aenter__.return_value.status = 200 + + plugin = HttpPlugin(allowed_domains={"example.com"}) + await plugin.post("https://example.com/path", body={"key": "value"}) + + _, kwargs = mock_post.call_args + assert kwargs["allow_redirects"] is False + + +@patch("aiohttp.ClientSession.put") +async def test_redirects_disabled_for_put_with_allowed_domains(mock_put): + """Test that redirects are disabled for PUT when allowed_domains is set.""" + mock_put.return_value.__aenter__.return_value.text.return_value = "OK" + mock_put.return_value.__aenter__.return_value.status = 200 + + plugin = HttpPlugin(allowed_domains={"example.com"}) + await plugin.put("https://example.com/path", body={"key": "value"}) + + _, kwargs = mock_put.call_args + assert kwargs["allow_redirects"] is False + + +@patch("aiohttp.ClientSession.delete") +async def test_redirects_disabled_for_delete_with_allowed_domains(mock_delete): + """Test that redirects are disabled for DELETE when allowed_domains is set.""" + mock_delete.return_value.__aenter__.return_value.text.return_value = "OK" + mock_delete.return_value.__aenter__.return_value.status = 200 + + plugin = HttpPlugin(allowed_domains={"example.com"}) + await plugin.delete("https://example.com/path") + + _, kwargs = mock_delete.call_args + assert kwargs["allow_redirects"] is False + + +@patch("aiohttp.ClientSession.get") +async def test_redirects_allowed_with_allow_all_domains(mock_get): + """Test that redirects are still allowed when allow_all_domains is True.""" + mock_get.return_value.__aenter__.return_value.text.return_value = "OK" + mock_get.return_value.__aenter__.return_value.status = 200 + + plugin = HttpPlugin(allow_all_domains=True) + await plugin.get("https://example.com/path") + + _, kwargs = mock_get.call_args + assert kwargs["allow_redirects"] is True + + +@patch("aiohttp.ClientSession.post") +async def test_redirects_allowed_for_post_with_allow_all_domains(mock_post): + """Test that redirects are allowed for POST when allow_all_domains is True.""" + mock_post.return_value.__aenter__.return_value.text.return_value = "OK" + mock_post.return_value.__aenter__.return_value.status = 200 + + plugin = HttpPlugin(allow_all_domains=True) + await plugin.post("https://example.com/path", body={"key": "value"}) + + _, kwargs = mock_post.call_args + assert kwargs["allow_redirects"] is True + + +@patch("aiohttp.ClientSession.put") +async def test_redirects_allowed_for_put_with_allow_all_domains(mock_put): + """Test that redirects are allowed for PUT when allow_all_domains is True.""" + mock_put.return_value.__aenter__.return_value.text.return_value = "OK" + mock_put.return_value.__aenter__.return_value.status = 200 + + plugin = HttpPlugin(allow_all_domains=True) + await plugin.put("https://example.com/path", body={"key": "value"}) + + _, kwargs = mock_put.call_args + assert kwargs["allow_redirects"] is True + + +@patch("aiohttp.ClientSession.delete") +async def test_redirects_allowed_for_delete_with_allow_all_domains(mock_delete): + """Test that redirects are allowed for DELETE when allow_all_domains is True.""" + mock_delete.return_value.__aenter__.return_value.text.return_value = "OK" + mock_delete.return_value.__aenter__.return_value.status = 200 + + plugin = HttpPlugin(allow_all_domains=True) + await plugin.delete("https://example.com/path") + + _, kwargs = mock_delete.call_args + assert kwargs["allow_redirects"] is True + + +@pytest.mark.parametrize("scheme", ["file", "ftp", "gopher", "data"]) +async def test_disallowed_schemes_blocked(scheme): + """Test that non-HTTP schemes are blocked.""" + plugin = HttpPlugin(allow_all_domains=True) + assert plugin._is_uri_allowed(f"{scheme}://example.com/path") is False + + +@pytest.mark.parametrize("scheme", ["file", "ftp", "gopher"]) +@pytest.mark.parametrize("method", ["get", "post", "put", "delete"]) +async def test_disallowed_schemes_blocked_all_methods(scheme, method): + """Test that non-HTTP schemes are blocked for all HTTP methods.""" + plugin = HttpPlugin(allow_all_domains=True) + with pytest.raises(FunctionExecutionException, match="Sending requests to the provided location is not allowed"): + if method in ["post", "put"]: + await getattr(plugin, method)(url=f"{scheme}://example.com/path", body={"key": "value"}) + else: + await getattr(plugin, method)(url=f"{scheme}://example.com/path") + + +async def test_http_scheme_allowed(): + """Test that both http and https schemes are allowed.""" + plugin = HttpPlugin(allow_all_domains=True) + assert plugin._is_uri_allowed("http://example.com/path") is True + assert plugin._is_uri_allowed("https://example.com/path") is True + + +async def test_empty_hostname_rejected(): + """Test that URLs with empty hostnames are rejected.""" + plugin = HttpPlugin(allow_all_domains=True) + assert plugin._is_uri_allowed("http://") is False + assert plugin._is_uri_allowed("https://") is False + + +async def test_allow_all_domains_with_allowed_domains_allows_redirects(): + """Test that redirects are allowed when both allow_all_domains and allowed_domains are set.""" + plugin = HttpPlugin(allowed_domains={"example.com"}, allow_all_domains=True) + assert plugin._is_uri_allowed("https://any-domain.com/path") is True