Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,12 @@ secops parser list --log-type "OKTA" --page-size 50 --filter "state=ACTIVE"
secops parser get --log-type "WINDOWS" --id "pa_12345"
```

#### Fetch parser candidates:

```bash
secops parser fetch-candidates --log-type "WINDOWS_DHCP" --parser-action "PARSER_ACTION_OPT_IN_TO_PREVIEW"
```

#### Create a new parser:

```bash
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,9 @@ print(f"Parser content: {parser.get('text')}")
chronicle.activate_parser(log_type=log_type, id=parser_id)
chronicle.deactivate_parser(log_type=log_type, id=parser_id)

# Fetch parser candidates (unactivated prebuilt parsers)
candidates = chronicle.fetch_parser_candidates(log_type=log_type, parser_action="PARSER_ACTION_OPT_IN_TO_PREVIEW")

# Copy an existing parser as a starting point
copied_parser = chronicle.copy_parser(log_type=log_type, id="pa_existing_parser")

Expand Down
6 changes: 5 additions & 1 deletion src/secops/chronicle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
merge_cases,
patch_case,
)
from secops.chronicle.models import CaseCloseReason, CasePriority
from secops.chronicle.models import CaseCloseReason, CasePriority, ParserAction
from secops.chronicle.client import (
ChronicleClient,
ValueType,
Expand Down Expand Up @@ -151,6 +151,7 @@
WidgetMetadata,
)
from secops.chronicle.nl_search import translate_nl_to_udm
from secops.chronicle.parser import fetch_parser_candidates
from secops.chronicle.reference_list import (
ReferenceListSyntaxType,
ReferenceListView,
Expand Down Expand Up @@ -243,6 +244,9 @@
"search_raw_logs",
# Natural Language Search
"translate_nl_to_udm",
# Parser
"fetch_parser_candidates",
"ParserAction",
# Entity
"import_entities",
"summarize_entity",
Expand Down
37 changes: 35 additions & 2 deletions src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,17 @@
)
from secops.chronicle.models import (
APIVersion,
AlertState,
CaseCloseReason,
CaseList,
CasePriority,
DashboardChart,
DashboardQuery,
EntitySummary,
InputInterval,
TileType,
AlertState,
ListBasis,
ParserAction,
TileType,
)
from secops.chronicle.nl_search import nl_search as _nl_search
from secops.chronicle.nl_search import translate_nl_to_udm
Expand All @@ -195,6 +196,9 @@
from secops.chronicle.parser import create_parser as _create_parser
from secops.chronicle.parser import deactivate_parser as _deactivate_parser
from secops.chronicle.parser import delete_parser as _delete_parser
from secops.chronicle.parser import (
fetch_parser_candidates as _fetch_parser_candidates,
)
from secops.chronicle.parser import get_parser as _get_parser
from secops.chronicle.parser import list_parsers as _list_parsers
from secops.chronicle.parser import run_parser as _run_parser
Expand Down Expand Up @@ -2774,6 +2778,35 @@ def get_parser(
"""
return _get_parser(self, log_type=log_type, id=id)

def fetch_parser_candidates(
self,
log_type: str,
parser_action: ParserAction | str,
) -> list[Any]:
"""Retrieves prebuilt parsers candidates.

Args:
log_type: Log type of the parser
parser_action: Action to perform on the parser candidates. Can be
a ParserAction enum value or a string. Valid values:
- ParserAction.PARSER_ACTION_UNSPECIFIED
- ParserAction.PARSER_ACTION_OPT_IN_TO_PREVIEW
- ParserAction.PARSER_ACTION_OPT_OUT_OF_PREVIEW
- ParserAction.CLONE_PREBUILT

Returns:
List of candidate parsers

Raises:
ValueError: If parser_action is an invalid string value
APIError: If the API request fails
"""
return _fetch_parser_candidates(
self,
log_type=log_type,
parser_action=parser_action,
)

def list_parsers(
self,
log_type: str = "-",
Expand Down
15 changes: 15 additions & 0 deletions src/secops/chronicle/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,3 +1150,18 @@ class APIVersion(StrEnum):
V1 = "v1"
V1BETA = "v1beta"
V1ALPHA = "v1alpha"


class ParserAction(StrEnum):
"""Actions that can be performed on parser candidates.

See:
https://cloud.google.com/chronicle/docs/reference/rest/v1beta/
projects.locations.instances.logTypes.parsers/
fetchParserCandidates#ParserAction
"""

PARSER_ACTION_UNSPECIFIED = "PARSER_ACTION_UNSPECIFIED"
PARSER_ACTION_OPT_IN_TO_PREVIEW = "PARSER_ACTION_OPT_IN_TO_PREVIEW"
PARSER_ACTION_OPT_OUT_OF_PREVIEW = "PARSER_ACTION_OPT_OUT_OF_PREVIEW"
CLONE_PREBUILT = "CLONE_PREBUILT"
49 changes: 48 additions & 1 deletion src/secops/chronicle/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import logging
from typing import Any

from secops.chronicle.models import APIVersion
from secops.chronicle.models import APIVersion, ParserAction
from secops.chronicle.utils.format_utils import remove_none_values
from secops.chronicle.utils.request_utils import (
chronicle_paginated_request,
chronicle_request,
)
from secops.exceptions import APIError, SecOpsError


# Constants for size limits
MAX_LOG_SIZE = 10 * 1024 * 1024 # 10MB per log
MAX_LOGS = 1000 # Maximum number of logs to process
Expand Down Expand Up @@ -235,6 +236,52 @@ def get_parser(
)


def fetch_parser_candidates(
client: "ChronicleClient",
log_type: str,
parser_action: "ParserAction | str",
) -> list[Any]:
"""Retrieves prebuilt parsers candidates.

Args:
client: ChronicleClient instance
log_type: Log type of the parser
parser_action: Action to perform on the parser candidates. Can be a
ParserAction enum value or a string. Valid values:
- ParserAction.PARSER_ACTION_UNSPECIFIED
- ParserAction.PARSER_ACTION_OPT_IN_TO_PREVIEW
- ParserAction.PARSER_ACTION_OPT_OUT_OF_PREVIEW
- ParserAction.CLONE_PREBUILT

Returns:
List of candidate parsers

Raises:
ValueError: If parser_action is an invalid string value
APIError: If the API request fails
"""
if isinstance(parser_action, str) and not isinstance(
parser_action, ParserAction
):
try:
parser_action = ParserAction(parser_action)
except ValueError as e:
valid = ", ".join(m.value for m in ParserAction)
raise ValueError(
f'Invalid parser_action: "{parser_action}". '
f"Valid values: {valid}"
) from e

data = chronicle_request(
client,
method="GET",
endpoint_path=(f"logTypes/{log_type}/parsers:fetchParserCandidates"),
params={"parserAction": parser_action},
error_message="Failed to fetch parser candidates",
)
return data.get("candidates", [])


def list_parsers(
client: "ChronicleClient",
log_type: str = "-",
Expand Down
32 changes: 32 additions & 0 deletions src/secops/cli/commands/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ def setup_parser_command(subparsers):
)
list_parsers_sub.set_defaults(func=handle_parser_list_command)

# --- Fetch Parser Candidates Command ---
fetch_parser_candidates_sub = parser_subparsers.add_parser(
"fetch-candidates", help="Fetch unactivated prebuilt parsers."
)
fetch_parser_candidates_sub.add_argument(
"--log-type", type=str, required=True, help="Log type of the parser."
)
fetch_parser_candidates_sub.add_argument(
"--parser-action",
type=str,
required=True,
help=(
"Action for the parser candidates "
"(e.g., PARSER_ACTION_OPT_IN_TO_PREVIEW)."
),
)
fetch_parser_candidates_sub.set_defaults(
func=handle_parser_fetch_candidates_command
)

# --- Run Parser Command ---
run_parser_sub = parser_subparsers.add_parser(
"run",
Expand Down Expand Up @@ -314,6 +334,18 @@ def handle_parser_delete_command(args, chronicle):
sys.exit(1)


def handle_parser_fetch_candidates_command(args, chronicle):
"""Handle parser fetch-candidates command."""
try:
result = chronicle.fetch_parser_candidates(
args.log_type, args.parser_action
)
output_formatter(result, args.output)
except Exception as e: # pylint: disable=broad-exception-caught
print(f"Error fetching parser candidates: {e}", file=sys.stderr)
sys.exit(1)


def handle_parser_get_command(args, chronicle):
"""Handle parser get command."""
try:
Expand Down
106 changes: 106 additions & 0 deletions tests/chronicle/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pytest

from secops.chronicle.client import ChronicleClient
from secops.chronicle.models import ParserAction
from secops.chronicle.parser import (
MAX_LOG_SIZE,
MAX_LOGS,
Expand All @@ -30,6 +31,7 @@
create_parser,
deactivate_parser,
delete_parser,
fetch_parser_candidates,
get_parser,
list_parsers,
run_parser,
Expand Down Expand Up @@ -153,6 +155,110 @@ def test_activate_release_candidate_parser_error(
assert "Failed to activate parser" in str(exc_info.value)


# --- fetch_parser_candidates Tests ---
def test_fetch_parser_candidates_success(chronicle_client, mock_response):
"""Test fetch_parser_candidates function for success."""
log_type = "SOME_LOG_TYPE"
parser_action = "PARSER_ACTION_OPT_IN_TO_PREVIEW"
expected_parsers = [{"name": f"logTypes/{log_type}/parsers/pa_001"}]
mock_response.json.return_value = {"candidates": expected_parsers}

with patch.object(
chronicle_client.session, "request", return_value=mock_response
) as mock_request:
result = fetch_parser_candidates(chronicle_client, log_type, parser_action)

expected_url = (
f"{chronicle_client.base_url}/{chronicle_client.instance_id}"
f"/logTypes/{log_type}/parsers:fetchParserCandidates"
)
mock_request.assert_called_once_with(
method="GET",
url=expected_url,
params={"parserAction": parser_action},
json=None,
headers=None,
timeout=None,
)
assert result == expected_parsers


def test_fetch_parser_candidates_empty(chronicle_client, mock_response):
"""Test fetch_parser_candidates function when no parsers are returned."""
log_type = "EMPTY_LOG_TYPE"
parser_action = "PARSER_ACTION_OPT_IN_TO_PREVIEW"
mock_response.json.return_value = {}

with patch.object(
chronicle_client.session, "request", return_value=mock_response
) as mock_request:
result = fetch_parser_candidates(chronicle_client, log_type, parser_action)

expected_url = (
f"{chronicle_client.base_url}/{chronicle_client.instance_id}"
f"/logTypes/{log_type}/parsers:fetchParserCandidates"
)
mock_request.assert_called_once_with(
method="GET",
url=expected_url,
params={"parserAction": parser_action},
json=None,
headers=None,
timeout=None,
)
assert result == []


def test_fetch_parser_candidates_error(chronicle_client, mock_error_response):
"""Test fetch_parser_candidates function for API error."""
log_type = "ERROR_LOG_TYPE"
parser_action = "PARSER_ACTION_OPT_IN_TO_PREVIEW"

with patch.object(
chronicle_client.session, "request", return_value=mock_error_response
):
with pytest.raises(APIError) as exc_info:
fetch_parser_candidates(chronicle_client, log_type, parser_action)
assert "Failed to fetch parser candidates" in str(exc_info.value)


def test_fetch_parser_candidates_with_enum(chronicle_client, mock_response):
"""Test fetch_parser_candidates accepts ParserAction enum directly."""
log_type = "SOME_LOG_TYPE"
parser_action = ParserAction.PARSER_ACTION_OPT_IN_TO_PREVIEW
expected_parsers = [{"name": f"logTypes/{log_type}/parsers/pa_001"}]
mock_response.json.return_value = {"candidates": expected_parsers}

with patch.object(
chronicle_client.session, "request", return_value=mock_response
) as mock_request:
result = fetch_parser_candidates(chronicle_client, log_type, parser_action)

expected_url = (
f"{chronicle_client.base_url}/{chronicle_client.instance_id}"
f"/logTypes/{log_type}/parsers:fetchParserCandidates"
)
mock_request.assert_called_once_with(
method="GET",
url=expected_url,
params={"parserAction": parser_action},
json=None,
headers=None,
timeout=None,
)
assert result == expected_parsers


def test_fetch_parser_candidates_invalid_string(chronicle_client):
"""Test fetch_parser_candidates raises ValueError for invalid string."""
with pytest.raises(ValueError) as exc_info:
fetch_parser_candidates(
chronicle_client, "SOME_LOG_TYPE", "INVALID_ACTION"
)
assert 'Invalid parser_action: "INVALID_ACTION"' in str(exc_info.value)
assert "PARSER_ACTION_OPT_IN_TO_PREVIEW" in str(exc_info.value)


# --- copy_parser Tests ---
def test_copy_parser_success(chronicle_client, mock_response):
"""Test copy_parser function for success."""
Expand Down