diff --git a/openml/_api/__init__.py b/openml/_api/__init__.py new file mode 100644 index 000000000..881f40671 --- /dev/null +++ b/openml/_api/__init__.py @@ -0,0 +1,8 @@ +from openml._api.runtime.core import APIContext + + +def set_api_version(version: str, *, strict: bool = False) -> None: + api_context.set_version(version=version, strict=strict) + + +api_context = APIContext() diff --git a/openml/_api/config.py b/openml/_api/config.py new file mode 100644 index 000000000..848fe8da1 --- /dev/null +++ b/openml/_api/config.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Literal + +DelayMethod = Literal["human", "robot"] + + +@dataclass +class APIConfig: + server: str + base_url: str + key: str + timeout: int = 10 # seconds + + +@dataclass +class APISettings: + v1: APIConfig + v2: APIConfig + + +@dataclass +class ConnectionConfig: + retries: int = 3 + delay_method: DelayMethod = "human" + delay_time: int = 1 # seconds + + def __post_init__(self) -> None: + if self.delay_method not in ("human", "robot"): + raise ValueError(f"delay_method must be 'human' or 'robot', got {self.delay_method}") + + +@dataclass +class CacheConfig: + dir: str = "~/.openml/cache" + ttl: int = 60 * 60 * 24 * 7 # one week + + +@dataclass +class Settings: + api: APISettings + connection: ConnectionConfig + cache: CacheConfig + + +settings = Settings( + api=APISettings( + v1=APIConfig( + server="https://www.openml.org/", + base_url="api/v1/xml/", + key="...", + ), + v2=APIConfig( + server="http://127.0.0.1:8001/", + base_url="", + key="...", + ), + ), + connection=ConnectionConfig(), + cache=CacheConfig(), +) diff --git a/openml/_api/http/__init__.py b/openml/_api/http/__init__.py new file mode 100644 index 000000000..8e6d1e4ce --- /dev/null +++ b/openml/_api/http/__init__.py @@ -0,0 +1,3 @@ +from openml._api.http.client import HTTPClient + +__all__ = ["HTTPClient"] diff --git a/openml/_api/http/client.py b/openml/_api/http/client.py new file mode 100644 index 000000000..a90e93933 --- /dev/null +++ b/openml/_api/http/client.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Any +from urllib.parse import urlencode, urljoin, urlparse + +import requests +from requests import Response + +from openml.__version__ import __version__ +from openml._api.config import settings + +if TYPE_CHECKING: + from openml._api.config import APIConfig + + +class CacheMixin: + @property + def dir(self) -> str: + return settings.cache.dir + + @property + def ttl(self) -> int: + return settings.cache.ttl + + def _get_cache_dir(self, url: str, params: dict[str, Any]) -> Path: + parsed_url = urlparse(url) + netloc_parts = parsed_url.netloc.split(".")[::-1] # reverse domain + path_parts = parsed_url.path.strip("/").split("/") + + # remove api_key and serialize params if any + filtered_params = {k: v for k, v in params.items() if k != "api_key"} + params_part = [urlencode(filtered_params)] if filtered_params else [] + + return Path(self.dir).joinpath(*netloc_parts, *path_parts, *params_part) + + def _get_cache_response(self, cache_dir: Path) -> Response: # noqa: ARG002 + return Response() + + def _set_cache_response(self, cache_dir: Path, response: Response) -> None: # noqa: ARG002 + return None + + +class HTTPClient(CacheMixin): + def __init__(self, config: APIConfig) -> None: + self.config = config + self.headers: dict[str, str] = {"user-agent": f"openml-python/{__version__}"} + + @property + def server(self) -> str: + return self.config.server + + @property + def base_url(self) -> str: + return self.config.base_url + + @property + def key(self) -> str: + return self.config.key + + @property + def timeout(self) -> int: + return self.config.timeout + + def request( + self, + method: str, + path: str, + *, + use_cache: bool = False, + use_api_key: bool = False, + **request_kwargs: Any, + ) -> Response: + url = urljoin(self.server, urljoin(self.base_url, path)) + + params = request_kwargs.pop("params", {}) + params = params.copy() + if use_api_key: + params["api_key"] = self.key + + headers = request_kwargs.pop("headers", {}) + headers = headers.copy() + headers.update(self.headers) + + timeout = request_kwargs.pop("timeout", self.timeout) + cache_dir = self._get_cache_dir(url, params) + + if use_cache: + try: + return self._get_cache_response(cache_dir) + # TODO: handle ttl expired error + except Exception: + raise + + response = requests.request( + method=method, + url=url, + params=params, + headers=headers, + timeout=timeout, + **request_kwargs, + ) + + if use_cache: + self._set_cache_response(cache_dir, response) + + return response + + def get( + self, + path: str, + *, + use_cache: bool = False, + use_api_key: bool = False, + **request_kwargs: Any, + ) -> Response: + # TODO: remove override when cache is implemented + use_cache = False + return self.request( + method="GET", + path=path, + use_cache=use_cache, + use_api_key=use_api_key, + **request_kwargs, + ) + + def post( + self, + path: str, + **request_kwargs: Any, + ) -> Response: + return self.request( + method="POST", + path=path, + use_cache=False, + use_api_key=True, + **request_kwargs, + ) + + def delete( + self, + path: str, + **request_kwargs: Any, + ) -> Response: + return self.request( + method="DELETE", + path=path, + use_cache=False, + use_api_key=True, + **request_kwargs, + ) diff --git a/openml/_api/http/utils.py b/openml/_api/http/utils.py new file mode 100644 index 000000000..e69de29bb diff --git a/openml/_api/resources/__init__.py b/openml/_api/resources/__init__.py new file mode 100644 index 000000000..931a5df4b --- /dev/null +++ b/openml/_api/resources/__init__.py @@ -0,0 +1,5 @@ +from openml._api.resources.datasets import DatasetsV1, DatasetsV2 +from openml._api.resources.runs import RunsV1, RunsV2 +from openml._api.resources.tasks import TasksV1, TasksV2 + +__all__ = ["DatasetsV1", "DatasetsV2", "RunsV1", "RunsV2", "TasksV1", "TasksV2"] diff --git a/openml/_api/resources/base.py b/openml/_api/resources/base.py new file mode 100644 index 000000000..9f7aa95d9 --- /dev/null +++ b/openml/_api/resources/base.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import pandas as pd + from requests import Response + + from openml._api.http import HTTPClient + from openml.datasets.dataset import OpenMLDataset + from openml.runs.run import OpenMLRun + from openml.tasks.task import OpenMLTask, TaskType + + +class ResourceAPI: + def __init__(self, http: HTTPClient): + self._http = http + + +class DatasetsAPI(ResourceAPI, ABC): + @abstractmethod + def get(self, dataset_id: int) -> OpenMLDataset | tuple[OpenMLDataset, Response]: ... + + +class TasksAPI(ResourceAPI, ABC): + @abstractmethod + def get( + self, + task_id: int, + *, + return_response: bool = False, + ) -> OpenMLTask | tuple[OpenMLTask, Response]: ... + + +class RunsAPI(ResourceAPI, ABC): + @abstractmethod + def get(self, run_id: int) -> OpenMLRun: ... + + @abstractmethod + def list( # noqa: PLR0913 + self, + limit: int, + offset: int, + *, + ids: list | None = None, + task: list | None = None, + setup: list | None = None, + flow: list | None = None, + uploader: list | None = None, + study: int | None = None, + tag: str | None = None, + display_errors: bool = False, + task_type: TaskType | int | None = None, + ) -> pd.DataFrame: ... + + @abstractmethod + def delete(self, run_id: int) -> bool: ... + + @abstractmethod + def create(self, run: OpenMLRun) -> OpenMLRun: ... diff --git a/openml/_api/resources/datasets.py b/openml/_api/resources/datasets.py new file mode 100644 index 000000000..9ff1ec278 --- /dev/null +++ b/openml/_api/resources/datasets.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from openml._api.resources.base import DatasetsAPI + +if TYPE_CHECKING: + from responses import Response + + from openml.datasets.dataset import OpenMLDataset + + +class DatasetsV1(DatasetsAPI): + def get(self, dataset_id: int) -> OpenMLDataset | tuple[OpenMLDataset, Response]: + raise NotImplementedError + + +class DatasetsV2(DatasetsAPI): + def get(self, dataset_id: int) -> OpenMLDataset | tuple[OpenMLDataset, Response]: + raise NotImplementedError diff --git a/openml/_api/resources/runs.py b/openml/_api/resources/runs.py new file mode 100644 index 000000000..28f65df34 --- /dev/null +++ b/openml/_api/resources/runs.py @@ -0,0 +1,288 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pandas as pd +import xmltodict + +import openml +from openml._api.resources.base import RunsAPI +from openml.tasks.task import TaskType + +if TYPE_CHECKING: + from openml.runs.run import OpenMLRun + + +class RunsV1(RunsAPI): + def get(self, run_id: int) -> OpenMLRun: + """Fetch a single run from the OpenML server. + + Parameters + ---------- + run_id : int + The ID of the run to fetch. + + Returns + ------- + OpenMLRun + The run object with all details populated. + + Raises + ------ + openml.exceptions.OpenMLServerException + If the run does not exist or server error occurs. + """ + path = f"run/{run_id}" + response = self._http.get(path) + xml_content = response.text + return openml.runs.functions._create_run_from_xml(xml_content) + + def list( # noqa: PLR0913, C901, PLR0912 + self, + limit: int, + offset: int, + *, + ids: list | None = None, + task: list | None = None, + setup: list | None = None, + flow: list | None = None, + uploader: list | None = None, + study: int | None = None, + tag: str | None = None, + display_errors: bool = False, + task_type: TaskType | int | None = None, + ) -> pd.DataFrame: + """List runs from the OpenML server with optional filtering. + + Parameters + ---------- + limit : int + Maximum number of runs to return. + offset : int + Starting position for pagination. + id : list of int, optional + List of run IDs to filter by. + task : list of int, optional + List of task IDs to filter by. + setup : list of int, optional + List of setup IDs to filter by. + flow : list of int, optional + List of flow IDs to filter by. + uploader : list of int, optional + List of uploader user IDs to filter by. + study : int, optional + Study ID to filter by. + tag : str, optional + Tag to filter by. + display_errors : bool, default=False + If True, include runs with error messages. + task_type : TaskType or int, optional + Task type ID to filter by. + + Returns + ------- + pd.DataFrame + DataFrame with columns: run_id, task_id, setup_id, flow_id, + uploader, task_type, upload_time, error_message. + + Raises + ------ + ValueError + If the server response is invalid or malformed. + """ + path = "run/list" + if limit is not None: + path += f"/limit/{limit}" + if offset is not None: + path += f"/offset/{offset}" + if ids is not None: + path += f"/run/{','.join([str(int(i)) for i in ids])}" + if task is not None: + path += f"/task/{','.join([str(int(i)) for i in task])}" + if setup is not None: + path += f"/setup/{','.join([str(int(i)) for i in setup])}" + if flow is not None: + path += f"/flow/{','.join([str(int(i)) for i in flow])}" + if uploader is not None: + path += f"/uploader/{','.join([str(int(i)) for i in uploader])}" + if study is not None: + path += f"/study/{study}" + if display_errors: + path += "/show_errors/true" + if tag is not None: + path += f"/tag/{tag}" + if task_type is not None: + tvalue = task_type.value if isinstance(task_type, TaskType) else task_type + path += f"/task_type/{tvalue}" + + xml_string = self._http.get(path).text + runs_dict = xmltodict.parse(xml_string, force_list=("oml:run",)) + # Minimalistic check if the XML is useful + if "oml:runs" not in runs_dict: + raise ValueError(f'Error in return XML, does not contain "oml:runs": {runs_dict}') + + if "@xmlns:oml" not in runs_dict["oml:runs"]: + raise ValueError( + f'Error in return XML, does not contain "oml:runs"/@xmlns:oml: {runs_dict}' + ) + + if runs_dict["oml:runs"]["@xmlns:oml"] != "http://openml.org/openml": + raise ValueError( + "Error in return XML, value of " + '"oml:runs"/@xmlns:oml is not ' + f'"http://openml.org/openml": {runs_dict}', + ) + + assert isinstance(runs_dict["oml:runs"]["oml:run"], list), type(runs_dict["oml:runs"]) + + runs = { + int(r["oml:run_id"]): { + "run_id": int(r["oml:run_id"]), + "task_id": int(r["oml:task_id"]), + "setup_id": int(r["oml:setup_id"]), + "flow_id": int(r["oml:flow_id"]), + "uploader": int(r["oml:uploader"]), + "task_type": TaskType(int(r["oml:task_type_id"])), + "upload_time": str(r["oml:upload_time"]), + "error_message": str((r["oml:error_message"]) or ""), + } + for r in runs_dict["oml:runs"]["oml:run"] + } + return pd.DataFrame.from_dict(runs, orient="index") + + def delete(self, run_id: int) -> bool: + """Delete a run from the OpenML server. + + Parameters + ---------- + run_id : int + The ID of the run to delete. + + Returns + ------- + bool + True if deletion was successful, False otherwise. + + Raises + ------ + openml.exceptions.OpenMLServerException + If the run does not exist or user lacks permissions. + + Notes + ----- + Only the uploader or server administrators can delete a run. + """ + path = f"run/{run_id}" + response = self._http.delete(path) + # Parse XML response to check if deletion was successful + xml_response = xmltodict.parse(response.text) + return "oml:run_delete" in xml_response + + def create(self, run: OpenMLRun) -> OpenMLRun: + """Create (publish) a run on the OpenML server. + + Parameters + ---------- + run : OpenMLRun + The run object to publish. + + Returns + ------- + OpenMLRun + The published run with run_id assigned. + """ + # TODO: Implement V1 multipart upload + # 1. Ensure flow is published + # 2. Get file elements (description.xml, predictions.arff, trace.arff) + # 3. POST multipart to /run/ + # 4. Parse XML response and set run_id + raise NotImplementedError("RunsV1.create() is not implemented yet.") + + +class RunsV2(RunsAPI): + """V2 API resource for runs. Currently read-only until V2 server supports POST.""" + + def get(self, run_id: int) -> OpenMLRun: + """Fetch a single run from the V2 server. + + Parameters + ---------- + run_id : int + The ID of the run to fetch. + + Returns + ------- + OpenMLRun + The run object. + + Raises + ------ + NotImplementedError + V2 server API not yet available for this operation. + """ + raise NotImplementedError("RunsV2.get is not implemented yet.") + + def list( # noqa: PLR0913 + self, + limit: int, + offset: int, + *, + ids: list | None = None, + task: list | None = None, + setup: list | None = None, + flow: list | None = None, + uploader: list | None = None, + study: int | None = None, + tag: str | None = None, + display_errors: bool = False, + task_type: TaskType | int | None = None, + ) -> pd.DataFrame: + """List runs from the V2 server. + + Raises + ------ + NotImplementedError + V2 server API not yet available for this operation. + """ + raise NotImplementedError("RunsV2.list is not implemented yet.") + + def delete(self, run_id: int) -> bool: + """Delete a run from the V2 server. + + Parameters + ---------- + run_id : int + The ID of the run to delete. + + Returns + ------- + bool + True if deletion was successful, False otherwise. + + Raises + ------ + NotImplementedError + V2 server API not yet available for this operation. + """ + raise NotImplementedError("RunsV2.delete is not implemented yet.") + + def create(self, run: OpenMLRun) -> OpenMLRun: + """Create (publish) a run on the V2 server. + + Parameters + ---------- + run : OpenMLRun + The run object to publish. + + Returns + ------- + OpenMLRun + The published run with run_id assigned. + + Raises + ------ + NotImplementedError + V2 server does not yet support POST /runs/ endpoint. + Expected availability: Q2 2025 + """ + raise NotImplementedError("RunsV2.create is not implemented yet.") diff --git a/openml/_api/resources/tasks.py b/openml/_api/resources/tasks.py new file mode 100644 index 000000000..f494fb9a3 --- /dev/null +++ b/openml/_api/resources/tasks.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import xmltodict + +from openml._api.resources.base import TasksAPI +from openml.tasks.task import ( + OpenMLClassificationTask, + OpenMLClusteringTask, + OpenMLLearningCurveTask, + OpenMLRegressionTask, + OpenMLTask, + TaskType, +) + +if TYPE_CHECKING: + from requests import Response + + +class TasksV1(TasksAPI): + def get( + self, + task_id: int, + *, + return_response: bool = False, + ) -> OpenMLTask | tuple[OpenMLTask, Response]: + path = f"task/{task_id}" + response = self._http.get(path) + xml_content = response.text + task = self._create_task_from_xml(xml_content) + + if return_response: + return task, response + + return task + + def _create_task_from_xml(self, xml: str) -> OpenMLTask: + """Create a task given a xml string. + + Parameters + ---------- + xml : string + Task xml representation. + + Returns + ------- + OpenMLTask + """ + dic = xmltodict.parse(xml)["oml:task"] + estimation_parameters = {} + inputs = {} + # Due to the unordered structure we obtain, we first have to extract + # the possible keys of oml:input; dic["oml:input"] is a list of + # OrderedDicts + + # Check if there is a list of inputs + if isinstance(dic["oml:input"], list): + for input_ in dic["oml:input"]: + name = input_["@name"] + inputs[name] = input_ + # Single input case + elif isinstance(dic["oml:input"], dict): + name = dic["oml:input"]["@name"] + inputs[name] = dic["oml:input"] + + evaluation_measures = None + if "evaluation_measures" in inputs: + evaluation_measures = inputs["evaluation_measures"]["oml:evaluation_measures"][ + "oml:evaluation_measure" + ] + + task_type = TaskType(int(dic["oml:task_type_id"])) + common_kwargs = { + "task_id": dic["oml:task_id"], + "task_type": dic["oml:task_type"], + "task_type_id": task_type, + "data_set_id": inputs["source_data"]["oml:data_set"]["oml:data_set_id"], + "evaluation_measure": evaluation_measures, + } + # TODO: add OpenMLClusteringTask? + if task_type in ( + TaskType.SUPERVISED_CLASSIFICATION, + TaskType.SUPERVISED_REGRESSION, + TaskType.LEARNING_CURVE, + ): + # Convert some more parameters + for parameter in inputs["estimation_procedure"]["oml:estimation_procedure"][ + "oml:parameter" + ]: + name = parameter["@name"] + text = parameter.get("#text", "") + estimation_parameters[name] = text + + common_kwargs["estimation_procedure_type"] = inputs["estimation_procedure"][ + "oml:estimation_procedure" + ]["oml:type"] + common_kwargs["estimation_procedure_id"] = int( + inputs["estimation_procedure"]["oml:estimation_procedure"]["oml:id"] + ) + + common_kwargs["estimation_parameters"] = estimation_parameters + common_kwargs["target_name"] = inputs["source_data"]["oml:data_set"][ + "oml:target_feature" + ] + common_kwargs["data_splits_url"] = inputs["estimation_procedure"][ + "oml:estimation_procedure" + ]["oml:data_splits_url"] + + cls = { + TaskType.SUPERVISED_CLASSIFICATION: OpenMLClassificationTask, + TaskType.SUPERVISED_REGRESSION: OpenMLRegressionTask, + TaskType.CLUSTERING: OpenMLClusteringTask, + TaskType.LEARNING_CURVE: OpenMLLearningCurveTask, + }.get(task_type) + if cls is None: + raise NotImplementedError(f"Task type {common_kwargs['task_type']} not supported.") + return cls(**common_kwargs) # type: ignore + + +class TasksV2(TasksAPI): + def get( + self, + task_id: int, + *, + return_response: bool = False, + ) -> OpenMLTask | tuple[OpenMLTask, Response]: + raise NotImplementedError diff --git a/openml/_api/runtime/__init__.py b/openml/_api/runtime/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openml/_api/runtime/core.py b/openml/_api/runtime/core.py new file mode 100644 index 000000000..82b96a4b2 --- /dev/null +++ b/openml/_api/runtime/core.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from openml._api.config import settings +from openml._api.http.client import HTTPClient +from openml._api.resources import ( + DatasetsV1, + DatasetsV2, + RunsV1, + RunsV2, + TasksV1, + TasksV2, +) + +if TYPE_CHECKING: + from openml._api.resources.base import DatasetsAPI, RunsAPI, TasksAPI + + +class APIBackend: + def __init__(self, *, datasets: DatasetsAPI, tasks: TasksAPI, runs: RunsAPI): + self.datasets = datasets + self.tasks = tasks + self.runs = runs + + +def build_backend(version: str, *, strict: bool) -> APIBackend: + v1_http = HTTPClient(config=settings.api.v1) + v2_http = HTTPClient(config=settings.api.v2) + + v1 = APIBackend( + datasets=DatasetsV1(v1_http), + tasks=TasksV1(v1_http), + runs=RunsV1(v1_http), + ) + + if version == "v1": + return v1 + + v2 = APIBackend( + datasets=DatasetsV2(v2_http), + tasks=TasksV2(v2_http), + runs=RunsV2(v2_http), + ) + + if strict: + return v2 + + return v1 + + +class APIContext: + def __init__(self) -> None: + self._backend = build_backend("v1", strict=False) + + def set_version(self, version: str, *, strict: bool = False) -> None: + self._backend = build_backend(version=version, strict=strict) + + @property + def backend(self) -> APIBackend: + return self._backend diff --git a/openml/_api/runtime/fallback.py b/openml/_api/runtime/fallback.py new file mode 100644 index 000000000..1bc99d270 --- /dev/null +++ b/openml/_api/runtime/fallback.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from openml._api.resources.base import ResourceAPI + + +class FallbackProxy: + def __init__(self, primary: ResourceAPI, fallback: ResourceAPI): + self._primary = primary + self._fallback = fallback diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 503788dbd..ab91af699 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -19,6 +19,7 @@ import openml._api_calls import openml.utils from openml import config +from openml._api import api_context from openml.exceptions import ( OpenMLCacheException, OpenMLRunsExistError, @@ -822,7 +823,7 @@ def get_run(run_id: int, ignore_cache: bool = False) -> OpenMLRun: # noqa: FBT0 Run corresponding to ID, fetched from the server. """ run_dir = Path(openml.utils._create_cache_directory_for_id(RUNS_CACHE_DIR_NAME, run_id)) - run_file = run_dir / "description.xml" + run_dir / "description.xml" run_dir.mkdir(parents=True, exist_ok=True) @@ -833,11 +834,7 @@ def get_run(run_id: int, ignore_cache: bool = False) -> OpenMLRun: # noqa: FBT0 raise OpenMLCacheException(message="dummy") except OpenMLCacheException: - run_xml = openml._api_calls._perform_api_call(f"run/{run_id}", "get") - with run_file.open("w", encoding="utf8") as fh: - fh.write(run_xml) - - return _create_run_from_xml(run_xml) + return api_context.backend.runs.get(run_id) def _create_run_from_xml(xml: str, from_server: bool = True) -> OpenMLRun: # noqa: PLR0915, PLR0912, C901, FBT002 @@ -1098,8 +1095,8 @@ def list_runs( # noqa: PLR0913 raise TypeError("uploader must be of type list.") listing_call = partial( - _list_runs, - id=id, + api_context.backend.runs.list, + ids=id, task=task, setup=setup, flow=flow,