forked from google/secops-wrapper
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstats.py
More file actions
174 lines (150 loc) · 5.8 KB
/
stats.py
File metadata and controls
174 lines (150 loc) · 5.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Statistics functionality for Chronicle searches."""
from datetime import datetime
from typing import Any, TYPE_CHECKING
from secops.chronicle.models import APIVersion
from secops.chronicle.utils.request_utils import (
chronicle_request,
)
from secops.exceptions import APIError
if TYPE_CHECKING:
from secops.chronicle.client import ChronicleClient
def get_stats(
client: "ChronicleClient",
query: str,
start_time: datetime,
end_time: datetime,
max_values: int = 60,
timeout: int = 120,
max_events: int = 10000,
case_insensitive: bool = True,
max_attempts: int = 30,
) -> dict[str, Any]:
"""
Get statistics from a Chronicle search query using
the Chronicle V1alpha API.
Args:
client: ChronicleClient instance
query: Chronicle search query in stats format
start_time: Search start time
end_time: Search end time
max_values: Maximum number of values to return per field
timeout: Timeout in seconds for API request (default: 120)
max_events: Maximum number of events to process
case_insensitive: Whether to perform case-insensitive search
(legacy parameter, not used by new API)
max_attempts: Legacy parameter kept for backwards compatibility
Returns:
Dictionary with search statistics including columns, rows,
and total_rows
Raises:
APIError: If the API request fails
"""
# Unused parameters, kept for backward compatibility
_ = (max_events, case_insensitive, max_attempts)
# Query parameters for the API call
params = {
"query": query,
"timeRange.start_time": start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"timeRange.end_time": end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"limit": max_values, # Limit to specified number of results
}
results = chronicle_request(
client,
method="GET",
endpoint_path=":udmSearch",
api_version=APIVersion.V1ALPHA,
params=params,
timeout=timeout,
error_message="Failed to get stats",
)
if "stats" not in results:
raise APIError("No stats found in response")
# Process the stats results
return process_stats_results(results["stats"])
def process_stats_results(stats: dict[str, Any]) -> dict[str, Any]:
"""Process stats search results.
Args:
stats: Stats search results from API
Returns:
Processed statistics with columns, rows, and total_rows
"""
processed_results = {"total_rows": 0, "columns": [], "rows": []}
# Return early if no results
if not stats or "results" not in stats:
return processed_results
# Extract columns
columns = []
column_data = {}
for col_data in stats["results"]:
col_name = col_data.get("column", "")
columns.append(col_name)
# Process values for this column
values = []
for val_data in col_data.get("values", []):
# Handle regular single value cells
if "value" in val_data:
val = val_data["value"]
if "int64Val" in val:
values.append(int(val["int64Val"]))
elif "doubleVal" in val:
values.append(float(val["doubleVal"]))
elif "stringVal" in val:
values.append(val["stringVal"])
elif "timestampVal" in val:
values.append(
datetime.fromisoformat(
val["timestampVal"].replace("Z", "+00:00")
)
)
else:
values.append(None)
# Handle list value cells (like those from array_distinct)
elif "list" in val_data and "values" in val_data["list"]:
list_values = []
for list_val in val_data["list"]["values"]:
if "int64Val" in list_val:
list_values.append(int(list_val["int64Val"]))
elif "doubleVal" in list_val:
list_values.append(float(list_val["doubleVal"]))
elif "stringVal" in list_val:
list_values.append(list_val["stringVal"])
elif "timestampVal" in list_val:
list_values.append(
datetime.fromisoformat(
list_val["timestampVal"].replace("Z", "+00:00")
)
)
values.append(list_values)
else:
values.append(None)
column_data[col_name] = values
# Build result rows
rows = []
if columns and all(col in column_data for col in columns):
max_rows = (
max(len(column_data[col]) for col in columns) if column_data else 0
)
processed_results["total_rows"] = max_rows
for i in range(max_rows):
row = {}
for col in columns:
col_values = column_data[col]
row[col] = col_values[i] if i < len(col_values) else None
rows.append(row)
processed_results["columns"] = columns
processed_results["rows"] = rows
return processed_results