Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bigframes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from bigframes._config.bigquery_options import BigQueryOptions # noqa: E402
from bigframes.core.global_session import ( # noqa: E402
close_session,
execution_history,
get_global_session,
)
import bigframes.enums as enums # noqa: E402
Expand Down Expand Up @@ -57,6 +58,7 @@ def load_ipython_extension(ipython):
"BigQueryOptions",
"get_global_session",
"close_session",
"execution_history",
"enums",
"exceptions",
"connect",
Expand Down
10 changes: 10 additions & 0 deletions bigframes/core/global_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import bigframes.exceptions as bfe

if TYPE_CHECKING:
import pandas

import bigframes.session

_global_session: Optional[bigframes.session.Session] = None
Expand Down Expand Up @@ -124,6 +126,14 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
return func_(get_global_session(), *args, **kwargs)


def execution_history() -> "pandas.DataFrame":
import pandas # noqa: F401

import bigframes.session

return with_default_session(bigframes.session.Session.execution_history)


class _GlobalSessionContext:
"""
Context manager for testing that sets global session.
Expand Down
37 changes: 37 additions & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,39 @@
logger = logging.getLogger(__name__)


class _ExecutionHistory(pandas.DataFrame):
@property
def _constructor(self):
return _ExecutionHistory

def _repr_html_(self) -> str | None:
try:
import bigframes.formatting_helpers as formatter

if self.empty:
return "<div>No executions found.</div>"

cols = ["job_id", "status", "total_bytes_processed", "job_url"]
df_display = self[cols].copy()
df_display["total_bytes_processed"] = df_display[
"total_bytes_processed"
].apply(formatter.get_formatted_bytes)

def format_url(url):
return f'<a target="_blank" href="{url}">Open Job</a>' if url else ""

df_display["job_url"] = df_display["job_url"].apply(format_url)

# Rename job_id to query_id to match user expectations
df_display = df_display.rename(columns={"job_id": "query_id"})

compact_html = df_display.to_html(escape=False, index=False)

return compact_html
except Exception:
return super()._repr_html_() # type: ignore


@log_adapter.class_logger
class Session(
third_party_pandas_gbq.GBQIOMixin,
Expand Down Expand Up @@ -371,6 +404,10 @@ def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._metrics.slot_millis

def execution_history(self) -> pandas.DataFrame:
"""Returns a list of underlying BigQuery executions initiated by BigFrames in the current session."""
return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])

@property
def _allows_ambiguity(self) -> bool:
return self._allow_ambiguity
Expand Down
5 changes: 5 additions & 0 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
from google.cloud import bigquery_storage_v1
import google.cloud.bigquery
import google.cloud.bigquery as bigquery
from google.cloud.bigquery.job.load import LoadJob
from google.cloud.bigquery.job.query import QueryJob
import google.cloud.bigquery.table
from google.cloud.bigquery_storage_v1 import types as bq_storage_types
import pandas
Expand Down Expand Up @@ -605,6 +607,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
else:
job.result()

if self._metrics is not None and isinstance(job, (QueryJob, LoadJob)):
self._metrics.count_job_stats(query_job=job)

@overload
def read_gbq_table( # type: ignore[overload-overlap]
self,
Expand Down
195 changes: 170 additions & 25 deletions bigframes/session/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,143 @@
from __future__ import annotations

import dataclasses
import datetime
import os
from typing import Optional, Tuple
from typing import Any, Mapping, Optional, Tuple, Union

import google.cloud.bigquery as bigquery
import google.cloud.bigquery.job as bq_job
from google.cloud.bigquery.job.load import LoadJob
from google.cloud.bigquery.job.query import QueryJob
import google.cloud.bigquery.table as bq_table

LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"


@dataclasses.dataclass
class JobMetadata:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a static factory method to build this from an sdk query job object?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Added a from_job classmethod (and from_row_iterator) to handle building the metadata object directly from the jobs.

job_id: Optional[str] = None
query_id: Optional[str] = None
location: Optional[str] = None
project: Optional[str] = None
creation_time: Optional[datetime.datetime] = None
start_time: Optional[datetime.datetime] = None
end_time: Optional[datetime.datetime] = None
duration_seconds: Optional[float] = None
status: Optional[str] = None
total_bytes_processed: Optional[int] = None
total_slot_ms: Optional[int] = None
job_type: Optional[str] = None
error_result: Optional[Mapping[str, Any]] = None
cached: Optional[bool] = None
job_url: Optional[str] = None
query: Optional[str] = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do worry that at a certain point, storing all query test generated by the session might clog up memory?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! To prevent memory bloat during long sessions, I have added truncation so we cap the stored query text strings at a maximum of 1024 characters.

destination_table: Optional[str] = None
source_uris: Optional[list[str]] = None
input_files: Optional[int] = None
input_bytes: Optional[int] = None
output_rows: Optional[int] = None
source_format: Optional[str] = None

@classmethod
def from_job(
cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None
) -> "JobMetadata":
query_text = getattr(query_job, "query", None)
if query_text and len(query_text) > 1024:
query_text = query_text[:1021] + "..."

job_id = getattr(query_job, "job_id", None)
job_url = None
if job_id:
job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults"

metadata = cls(
job_id=query_job.job_id,
location=query_job.location,
project=query_job.project,
creation_time=query_job.created,
start_time=query_job.started,
end_time=query_job.ended,
duration_seconds=exec_seconds,
status=query_job.state,
job_type=query_job.job_type,
error_result=query_job.error_result,
query=query_text,
job_url=job_url,
)
if isinstance(query_job, QueryJob):
metadata.cached = getattr(query_job, "cache_hit", None)
metadata.destination_table = (
str(query_job.destination) if query_job.destination else None
)
metadata.total_bytes_processed = getattr(
query_job, "total_bytes_processed", None
)
metadata.total_slot_ms = getattr(query_job, "slot_millis", None)
elif isinstance(query_job, LoadJob):
metadata.output_rows = getattr(query_job, "output_rows", None)
metadata.input_files = getattr(query_job, "input_files", None)
metadata.input_bytes = getattr(query_job, "input_bytes", None)
metadata.destination_table = (
str(query_job.destination)
if getattr(query_job, "destination", None)
else None
)
if getattr(query_job, "source_uris", None):
metadata.source_uris = list(query_job.source_uris)
if query_job.configuration and hasattr(
query_job.configuration, "source_format"
):
metadata.source_format = query_job.configuration.source_format

return metadata

@classmethod
def from_row_iterator(
cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None
) -> "JobMetadata":
query_text = getattr(row_iterator, "query", None)
if query_text and len(query_text) > 1024:
query_text = query_text[:1021] + "..."

job_id = getattr(row_iterator, "job_id", None)
job_url = None
if job_id:
project = getattr(row_iterator, "project", "")
location = getattr(row_iterator, "location", "")
job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults"

return cls(
job_id=job_id,
query_id=getattr(row_iterator, "query_id", None),
location=getattr(row_iterator, "location", None),
project=getattr(row_iterator, "project", None),
creation_time=getattr(row_iterator, "created", None),
start_time=getattr(row_iterator, "started", None),
end_time=getattr(row_iterator, "ended", None),
duration_seconds=exec_seconds,
status="DONE",
total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None),
total_slot_ms=getattr(row_iterator, "slot_millis", None),
job_type="query",
cached=getattr(row_iterator, "cache_hit", None),
query=query_text,
job_url=job_url,
)


@dataclasses.dataclass
class ExecutionMetrics:
execution_count: int = 0
slot_millis: int = 0
bytes_processed: int = 0
execution_secs: float = 0
query_char_count: int = 0
jobs: list[JobMetadata] = dataclasses.field(default_factory=list)

def count_job_stats(
self,
query_job: Optional[bq_job.QueryJob] = None,
query_job: Optional[Union[QueryJob, LoadJob]] = None,
row_iterator: Optional[bq_table.RowIterator] = None,
):
if query_job is None:
Expand All @@ -57,42 +173,71 @@ def count_job_stats(
self.slot_millis += slot_millis
self.execution_secs += exec_seconds

elif query_job.configuration.dry_run:
query_char_count = len(query_job.query)
self.jobs.append(
JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds)
)

elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run:
query_char_count = len(getattr(query_job, "query", ""))

# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = 0
slot_millis = 0
exec_seconds = 0.0

elif (stats := get_performance_stats(query_job)) is not None:
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
elif isinstance(query_job, bigquery.QueryJob):
if (stats := get_performance_stats(query_job)) is not None:
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
self.execution_count += 1
self.query_char_count += query_char_count or 0
self.bytes_processed += bytes_processed or 0
self.slot_millis += slot_millis or 0
self.execution_secs += exec_seconds or 0

metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds)
metadata.total_bytes_processed = bytes_processed
metadata.total_slot_ms = slot_millis
self.jobs.append(metadata)

else:
self.execution_count += 1
self.query_char_count += query_char_count or 0
self.bytes_processed += bytes_processed or 0
self.slot_millis += slot_millis or 0
self.execution_secs += exec_seconds or 0
duration = (
(query_job.ended - query_job.created).total_seconds()
if query_job.ended and query_job.created
else None
)
self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration))

# For pytest runs only, log information about the query job
# to a file in order to create a performance report.
if (
isinstance(query_job, bigquery.QueryJob)
and not query_job.configuration.dry_run
):
stats = get_performance_stats(query_job)
if stats:
write_stats_to_disk(
query_char_count=stats[0],
bytes_processed=stats[1],
slot_millis=stats[2],
exec_seconds=stats[3],
)
elif row_iterator is not None:
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
query_char_count = len(getattr(row_iterator, "query", "") or "")
slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
created = getattr(row_iterator, "created", None)
ended = getattr(row_iterator, "ended", None)
exec_seconds = (
(ended - created).total_seconds() if created and ended else 0.0
)
write_stats_to_disk(
query_char_count=query_char_count,
bytes_processed=bytes_processed,
slot_millis=slot_millis,
exec_seconds=exec_seconds,
)

else:
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = 0
query_char_count = 0
slot_millis = 0
exec_seconds = 0

write_stats_to_disk(
query_char_count=query_char_count,
bytes_processed=bytes_processed,
slot_millis=slot_millis,
exec_seconds=exec_seconds,
)


def get_performance_stats(
query_job: bigquery.QueryJob,
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def all_session_methods():
session_attributes.remove("close")
# streaming isn't in pandas
session_attributes.remove("read_gbq_table_streaming")
# execution_history is in base namespace, not pandas
session_attributes.remove("execution_history")

for attribute in sorted(session_attributes):
session_method = getattr(bigframes.session.Session, attribute)
Expand Down
Loading