Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
## Bug Fixes

- Improved formula validation: Consistent error messages for invalid formulas and conventional span semantics.

- This fixes a rare power distributor bug where some battery inverters becoming unreachable because of network outages would lead to excess power values getting set. This is fixed by measuring the power of the unreachable inverters through their fallback meters and excluding that power from what is distributed to the other inverters.
- Better data stream subscription handling in Coalesce function: Only unsubscribe from stream after other streams have received 3 samples.
39 changes: 30 additions & 9 deletions src/frequenz/sdk/timeseries/formulas/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ class Coalesce(Function[QuantityT]):
"""A function that returns the first non-None argument."""

num_subscribed: int = 0
"""Number of parameters currently subscribed to."""

num_samples: int = 0
"""Number of samples received since last subscription change.

This only counts samples from parameters other than the last one,
and may indicate that the last parameter can be unsubscribed from.
"""

@property
@override
Expand All @@ -122,15 +130,15 @@ async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
match arg:
case Sample(timestamp, value):
if value is not None:
# Found a non-None value, unsubscribe from subsequent params
# Found a non-None value
if ctr < self.num_subscribed:
await self._unsubscribe_all_params_after(ctr)
await self._count_sample()
return arg
ts = timestamp
case Quantity():
# Found a non-None value, unsubscribe from subsequent params
if ctr < self.num_subscribed:
await self._unsubscribe_all_params_after(ctr)
await self._unsubscribe_last_param()
if ts is not None:
return Sample(timestamp=ts, value=arg)
return arg
Expand All @@ -156,6 +164,17 @@ async def subscribe(self) -> None:
if self.num_subscribed == 0:
await self._subscribe_next_param()

async def _count_sample(self) -> None:
"""Count samples from parameters other than the last.

If a reasonable number of samples is reached, unsubscribe
from the last parameter, whose samples were not counted.
"""
self.num_samples += 1

if self.num_samples >= 3:
await self._unsubscribe_last_param()

async def _subscribe_next_param(self) -> None:
"""Subscribe to the next parameter."""
if self.num_subscribed < len(self.params):
Expand All @@ -166,16 +185,18 @@ async def _subscribe_next_param(self) -> None:
)
await self.params[self.num_subscribed].subscribe()
self.num_subscribed += 1
self.num_samples = 0

async def _unsubscribe_all_params_after(self, index: int) -> None:
"""Unsubscribe from parameters after the given index."""
for param in self.params[index:]:
async def _unsubscribe_last_param(self) -> None:
"""Unsubscribe from the last parameter."""
if self.num_subscribed > 1:
_logger.debug(
"Coalesce unsubscribing from param: %s",
param,
self.num_subscribed,
)
await param.unsubscribe()
self.num_subscribed = index
await self.params[self.num_subscribed - 1].unsubscribe()
self.num_subscribed -= 1
self.num_samples = 0


@dataclass
Expand Down
186 changes: 186 additions & 0 deletions tests/timeseries/_formulas/test_formula_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2026


"""Tests for the Formula implementation."""

from unittest.mock import Mock

import pytest
from frequenz.quantities import Quantity

from frequenz.sdk.timeseries.formulas._exceptions import FormulaSyntaxError
from frequenz.sdk.timeseries.formulas._parser import parse


class TestFormulaValidation:
"""Tests for Formula validation."""

@pytest.mark.parametrize(
("formula_str", "parsed_formula_str"),
[
("#1", "[f](#1)"),
("-(1+#1)", "[f](0.0 - (1.0 + #1))"),
("1*(2+3)", "[f](1.0 * (2.0 + 3.0))"),
],
)
async def test_parser_validation(
self,
formula_str: str,
parsed_formula_str: str,
) -> None:
"""Test formula parser validation."""
try:
formula = parse(
name="f",
formula=formula_str,
create_method=Quantity,
telemetry_fetcher=Mock(),
)
assert str(formula) == parsed_formula_str
except FormulaSyntaxError:
assert False, "Parser should not raise an error for this formula"

@pytest.mark.parametrize(
("formula_str", "expected_error_line"),
[
(
"1++",
" ^ Expected expression",
),
(
"1**",
" ^ Expected expression",
),
(
"--1",
" ^ Expected expression",
),
(
"(",
" ^ Expected expression",
),
(
"(1",
"^ Unmatched parenthesis",
),
(
"max",
" ^ Expected '(' after function name",
),
(
"max()",
" ^ Expected argument",
),
(
"max(1(",
" ^ Expected ',' or ')'",
),
(
"max(1",
" ^ Unmatched parenthesis",
),
(
"foo",
"^^^ Unknown function name",
),
(
"foo(1)",
"^^^ Unknown function name",
),
(
"max(1,,2)",
" ^ Expected argument",
),
(
"1 2",
" ^ Unexpected token",
),
(
"1, 2",
" ^ Unexpected token",
),
(
"max(1, 2,)",
" ^ Expected argument",
),
(
"max(1, 2))",
" ^ Unexpected token",
),
(
"max(1, 2),",
" ^ Unexpected token",
),
],
)
async def test_parser_validation_errors(
self, formula_str: str, expected_error_line: str
) -> None:
"""Test formula parser validation."""
with pytest.raises(FormulaSyntaxError) as error:
_ = parse(
name="f",
formula=formula_str,
create_method=Quantity,
telemetry_fetcher=Mock(),
)

assert str(error.value) == (
"Formula syntax error:\n"
f" Formula: {formula_str}\n"
f" {expected_error_line}"
)

@pytest.mark.parametrize(
("formula_str", "expected_error"),
[
# Long formula with error near start -> Ellipsize end
(
"max(coalesce(#1001, %1002, 0), coalesce(#1003, #1004, 0), coalesce(#1005, #1006, 0), coalesce(#1007, #1008, 0))", # noqa: E501
"Formula syntax error:\n"
" Formula: max(coalesce(#1001, %1002, 0), coalesce(#1003, #1004, 0), coalesc ...\n"
" ^ Unexpected character",
),
# Long formula with error near the end -> Ellipsize start
(
"max(coalesce(#1001, #1002, 0), coalesce(#1003, #1004, 0), coalesce(#1005, #1006, 0), coalesce(#10.07, #1008, 0))", # noqa: E501
"Formula syntax error:\n"
" Formula: ... 0), coalesce(#1005, #1006, 0), coalesce(#10.07, #1008, 0))\n"
" ^ Unexpected character",
),
# Very long formula with error in the middle -> Ellipsize both sides
(
"max(coalesce(#1001, #1002, 0), coalesce(#1003, #1004, 0), coalesce(#1005, #1006, 0), coalesce(#1007, #1008, 0)) :) " # noqa: E501
"min(coalesce(#2001, #2002, 0), coalesce(#2003, #2004, 0), coalesce(#2005, #2006, 0), coalesce(#2007, #2008, 0))", # noqa: E501
"Formula syntax error:\n"
" Formula: ... 005, #1006, 0), coalesce(#1007, #1008, 0)) :) min(coalesce(#2 ...\n"
" ^ Unexpected character",
),
],
)
async def test_parser_validation_errors_in_long_formulas(
self, formula_str: str, expected_error: str
) -> None:
"""Test formula parser validation for long formulas."""
with pytest.raises(FormulaSyntaxError) as error:
_ = parse(
name="f",
formula=formula_str,
create_method=Quantity,
telemetry_fetcher=Mock(),
)

assert str(error.value) == expected_error
assert all(len(line) <= 80 for line in str(error.value).splitlines())

async def test_empty_formula(self) -> None:
"""Test formula parser validation."""
with pytest.raises(FormulaSyntaxError) as error:
_ = parse(
name="f",
formula="",
create_method=Quantity,
telemetry_fetcher=Mock(),
)

assert str(error.value) == "Empty formula"
Loading
Loading