Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions diracx-core/src/diracx/core/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ConfigSourceUrl,
LocalGitConfigSource,
RemoteGitConfigSource,
ResourceStatusSource,
is_running_in_async_context,
)

Expand All @@ -17,5 +18,6 @@
"ConfigSourceUrl",
"LocalGitConfigSource",
"RemoteGitConfigSource",
"ResourceStatusSource",
"is_running_in_async_context",
)
91 changes: 89 additions & 2 deletions diracx-core/src/diracx/core/config/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,30 @@
from datetime import datetime, timezone
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Annotated, Generic, TypeVar
from typing import Annotated, Generic, Literal, TypeVar, Union
from urllib.parse import urlparse, urlunparse

import sh
import yaml
from cachetools import Cache, LRUCache
from pydantic import AnyUrl, BeforeValidator, TypeAdapter, UrlConstraints

from ..exceptions import BadConfigurationVersionError
from diracx.db.sql.rss.db import ResourceStatusDB
from diracx.logic.rss.query import (
get_compute_statuses,
get_fts_statuses,
get_site_statuses,
get_storage_statuses,
)

from ..exceptions import BadConfigurationVersionError, ResourceNotFoundError
from ..extensions import DiracEntryPoint, select_from_extension
from ..models.rss import (
ComputeElementStatus,
FTSStatus,
SiteStatus,
StorageElementStatus,
)
from ..utils import TwoLevelCache
from .schema import Config

Expand Down Expand Up @@ -304,3 +318,76 @@ def latest_revision(self) -> tuple[str, datetime]:
logger.exception(err)

return super().latest_revision()


class ResourceStatusSource(
CacheableSource[
dict[str, StorageElementStatus]
| dict[str, ComputeElementStatus]
| dict[str, SiteStatus]
| dict[str, FTSStatus]
]
):
"""A source that provides the status of a resource."""

def __init__(
self,
*,
resource_type: Literal["ComputeElement", "StorageElement", "Site", "FTS"],
vo: str = "all",
resource_status_db: ResourceStatusDB,
) -> None:
self.resource_type = resource_type
self.vo = vo
self.resource_status_db = resource_status_db
super().__init__()

def latest_revision(self) -> tuple[str, datetime]:
"""Return the latest revision of the resource status.

This could be a hash of the current status snapshot or the max DateEffective/LastCheckTime.
"""
# Fetch the resource status from the database
status_date = asyncio.run(self.resource_status_db.get_status_date(vo=self.vo))
# Generate a unique hash for the current status snapshot
status_hash = hash(frozenset(status_date))
latest_revision = f"rev_{status_hash}"

modified = status_date.DateEffective

return latest_revision, modified

def read_raw(
self, hexsha: str, modified: datetime
) -> Union[
dict[str, StorageElementStatus],
dict[str, ComputeElementStatus],
dict[str, SiteStatus],
dict[str, FTSStatus],
]:
"""Read the raw resource status from the database."""
# Fetch the resource status from the database
status_data = asyncio.run(self.get_status_data())
for status in status_data.values():
status._hexsha = hexsha
status._modified = modified
return status_data

async def get_status_data(self):
status_data = None
if self.resource_type == "Site":
status_data = await get_site_statuses(self.resource_status_db, vo=self.vo)
elif self.resource_type == "ComputeElement":
status_data = await get_compute_statuses(
self.resource_status_db, vo=self.vo
)
elif self.resource_type == "StorageElement":
status_data = await get_storage_statuses(
self.resource_status_db, vo=self.vo
)
elif self.resource_type == "FTS":
status_data = await get_fts_statuses(self.resource_status_db, vo=self.vo)

if not status_data:
raise ResourceNotFoundError(f"Resource type {self.resource_type} not found")
return status_data
20 changes: 15 additions & 5 deletions diracx-core/src/diracx/core/models/rss.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
from __future__ import annotations

from datetime import datetime
from enum import StrEnum
from typing import Annotated, Literal, Union

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, PrivateAttr


class CachedModel(BaseModel):
"""Base class for models that are cached."""

# hash for a unique representation of the status version
_hexsha: str = PrivateAttr()
# modification date
_modified: datetime = PrivateAttr()


class AllowedStatus(BaseModel):
Expand Down Expand Up @@ -34,22 +44,22 @@ class ResourceType(StrEnum):
FTS = "FTS"


class StorageElementStatus(BaseModel):
class StorageElementStatus(CachedModel):
read: ResourceStatus
write: ResourceStatus
check: ResourceStatus
remove: ResourceStatus


class ComputeElementStatus(BaseModel):
class ComputeElementStatus(CachedModel):
all: ResourceStatus


class FTSStatus(BaseModel):
class FTSStatus(CachedModel):
all: ResourceStatus


class SiteStatus(BaseModel):
class SiteStatus(CachedModel):
all: ResourceStatus


Expand Down
189 changes: 189 additions & 0 deletions diracx-core/tests/test_status_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
from __future__ import annotations

from collections import namedtuple
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock

import pytest

from diracx.core.config.sources import ResourceStatusSource
from diracx.core.exceptions import ResourceNotFoundError
from diracx.core.models.rss import (
ComputeElementStatus,
FTSStatus,
SiteStatus,
StorageElementStatus,
)
from diracx.db.sql.rss.db import ResourceStatusDB


@pytest.fixture
def mock_resource_status_db():
"""Fixture to mock the ResourceStatusDB."""
db = MagicMock(spec=ResourceStatusDB)
DateRow = namedtuple("DateRow", ["DateEffective", "DateChecked"])
db.get_status_date = AsyncMock(
return_value=DateRow(
DateEffective=datetime.fromisoformat("2023-01-01T00:00:00+00:00"),
DateChecked=datetime.now(timezone.utc),
)
)
return db


def test_latest_revision(mock_resource_status_db):
"""Test the latest_revision method of ResourceStatusSource."""
source = ResourceStatusSource(
resource_type="Site",
vo="test_vo",
resource_status_db=mock_resource_status_db,
)

# Call the method
revision, modified = source.latest_revision()

# Verify the revision is generated correctly
assert revision.startswith("rev_")
assert isinstance(modified, datetime)

# Verify the database call
mock_resource_status_db.get_status_date.assert_called_once_with(vo="test_vo")


def test_read_raw_site(mock_resource_status_db):
"""Test the read_raw method for Site resource type."""
# Mock the database data
mock_db_data = [("testSite", "Active", "")]

# Patch the get_site_statuses method of the database to return the mock data
mock_resource_status_db.get_site_statuses = AsyncMock(return_value=mock_db_data)

# Initialize the ResourceStatusSource with the mocked database
source = ResourceStatusSource(
resource_type="Site",
vo="test_vo",
resource_status_db=mock_resource_status_db,
)

# Call the read_raw method, which internally calls get_site_statuses from query.py
result = source.read_raw("test_revision", datetime.now(tz=timezone.utc))

# Verify the result matches the expected output
expected_result = {"testSite": SiteStatus(all={"allowed": True, "warnings": None})}
for key, value in expected_result.items():
assert key in result
assert value.model_dump() == result[key].model_dump()
# Verify that the database method was called correctly
mock_resource_status_db.get_site_statuses.assert_awaited_once_with("test_vo")


def test_read_raw_compute(mock_resource_status_db):
"""Test the read_raw method for ComputeElement resource type."""
ResourceStatus = namedtuple("ResourceStatus", ["Name", "Status", "Reason"])

mock_db_data = {
"TestCE": {"all": ResourceStatus(Name="TestCE", Status="Active", Reason="")}
}
mock_resource_status_db.get_resource_statuses = AsyncMock(return_value=mock_db_data)

source = ResourceStatusSource(
resource_type="ComputeElement",
vo="test_vo",
resource_status_db=mock_resource_status_db,
)

# Call the method
result = source.read_raw("test_revision", datetime.now(tz=timezone.utc))

# Verify the result
expected_result = {
"TestCE": ComputeElementStatus(all={"allowed": True, "warnings": None})
}
for key, value in expected_result.items():
assert key in result
assert value.model_dump() == result[key].model_dump()
mock_resource_status_db.get_resource_statuses.assert_awaited_once_with(
["all"], "test_vo"
)


def test_read_raw_storage(mock_resource_status_db):
"""Test the read_raw method for StorageElement resource type."""
ResourceStatus = namedtuple("ResourceStatus", ["Name", "Status", "Reason"])

mock_db_data = {
"TestSE": {
"ReadAccess": ResourceStatus(Name="TestSE", Status="Active", Reason=None),
"WriteAccess": ResourceStatus(Name="TestSE", Status="Active", Reason=None),
"CheckAccess": ResourceStatus(Name="TestSE", Status="Active", Reason=None),
"RemoveAccess": ResourceStatus(Name="TestSE", Status="Active", Reason=None),
}
}
mock_resource_status_db.get_resource_statuses.return_value = mock_db_data
source = ResourceStatusSource(
resource_type="StorageElement",
vo="test_vo",
resource_status_db=mock_resource_status_db,
)

# Call the method
result = source.read_raw("test_revision", datetime.now(tz=timezone.utc))

# Verify the result
expected_result = {
"TestSE": StorageElementStatus(
read={"allowed": True, "warnings": None},
write={"allowed": True, "warnings": None},
check={"allowed": True, "warnings": None},
remove={"allowed": True, "warnings": None},
)
}
for key, value in expected_result.items():
assert key in result
assert value.model_dump() == result[key].model_dump()
mock_resource_status_db.get_resource_statuses.assert_awaited_once_with(
["ReadAccess", "WriteAccess", "CheckAccess", "RemoveAccess"], "test_vo"
)


def test_read_raw_fts(mock_resource_status_db):
"""Test the read_raw method for FTS resource type."""
ResourceStatus = namedtuple("ResourceStatus", ["Name", "Status", "Reason"])

mock_db_data = {
"FTS": {
"all": ResourceStatus(Name="FTS", Status="Active", Reason=None),
}
}
mock_resource_status_db.get_resource_statuses.return_value = mock_db_data

source = ResourceStatusSource(
resource_type="FTS",
vo="test_vo",
resource_status_db=mock_resource_status_db,
)

# Call the method
result = source.read_raw("test_revision", datetime.now(tz=timezone.utc))

# Verify the result
expected_result = {"FTS": FTSStatus(all={"allowed": True, "warnings": None})}
for key, value in expected_result.items():
assert key in result
assert value.model_dump() == result[key].model_dump()
mock_resource_status_db.get_resource_statuses.assert_awaited_once_with(
["all"], "test_vo"
)


def test_read_raw_invalid_resource_type(mock_resource_status_db):
"""Test the read_raw method for an invalid resource type."""
source = ResourceStatusSource(
resource_type="InvalidType",
vo="test_vo",
resource_status_db=mock_resource_status_db,
)

# Call the method and verify it raises ResourceNotFoundError
with pytest.raises(ResourceNotFoundError):
source.read_raw("test_revision", datetime.now(tz=timezone.utc))
Loading
Loading