-
Notifications
You must be signed in to change notification settings - Fork 67
feat: Add bigframes.execution_history API to track BigQuery jobs #2435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,27 +15,143 @@ | |
| from __future__ import annotations | ||
|
|
||
| import dataclasses | ||
| import datetime | ||
| import os | ||
| from typing import Optional, Tuple | ||
| from typing import Any, Mapping, Optional, Tuple, Union | ||
|
|
||
| import google.cloud.bigquery as bigquery | ||
| import google.cloud.bigquery.job as bq_job | ||
| from google.cloud.bigquery.job.load import LoadJob | ||
| from google.cloud.bigquery.job.query import QueryJob | ||
| import google.cloud.bigquery.table as bq_table | ||
|
|
||
| LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME" | ||
|
|
||
|
|
||
| @dataclasses.dataclass | ||
| class JobMetadata: | ||
| job_id: Optional[str] = None | ||
| query_id: Optional[str] = None | ||
| location: Optional[str] = None | ||
| project: Optional[str] = None | ||
| creation_time: Optional[datetime.datetime] = None | ||
| start_time: Optional[datetime.datetime] = None | ||
| end_time: Optional[datetime.datetime] = None | ||
| duration_seconds: Optional[float] = None | ||
| status: Optional[str] = None | ||
| total_bytes_processed: Optional[int] = None | ||
| total_slot_ms: Optional[int] = None | ||
| job_type: Optional[str] = None | ||
| error_result: Optional[Mapping[str, Any]] = None | ||
| cached: Optional[bool] = None | ||
| job_url: Optional[str] = None | ||
| query: Optional[str] = None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do worry that at a certain point, storing all query test generated by the session might clog up memory?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! To prevent memory bloat during long sessions, I have added truncation so we cap the stored query text strings at a maximum of 1024 characters. |
||
| destination_table: Optional[str] = None | ||
| source_uris: Optional[list[str]] = None | ||
| input_files: Optional[int] = None | ||
| input_bytes: Optional[int] = None | ||
| output_rows: Optional[int] = None | ||
| source_format: Optional[str] = None | ||
|
|
||
| @classmethod | ||
| def from_job( | ||
| cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None | ||
| ) -> "JobMetadata": | ||
| query_text = getattr(query_job, "query", None) | ||
| if query_text and len(query_text) > 1024: | ||
| query_text = query_text[:1021] + "..." | ||
|
|
||
| job_id = getattr(query_job, "job_id", None) | ||
| job_url = None | ||
| if job_id: | ||
| job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults" | ||
|
|
||
| metadata = cls( | ||
| job_id=query_job.job_id, | ||
| location=query_job.location, | ||
| project=query_job.project, | ||
| creation_time=query_job.created, | ||
| start_time=query_job.started, | ||
| end_time=query_job.ended, | ||
| duration_seconds=exec_seconds, | ||
| status=query_job.state, | ||
| job_type=query_job.job_type, | ||
| error_result=query_job.error_result, | ||
| query=query_text, | ||
| job_url=job_url, | ||
| ) | ||
| if isinstance(query_job, QueryJob): | ||
| metadata.cached = getattr(query_job, "cache_hit", None) | ||
| metadata.destination_table = ( | ||
| str(query_job.destination) if query_job.destination else None | ||
| ) | ||
| metadata.total_bytes_processed = getattr( | ||
| query_job, "total_bytes_processed", None | ||
| ) | ||
| metadata.total_slot_ms = getattr(query_job, "slot_millis", None) | ||
| elif isinstance(query_job, LoadJob): | ||
| metadata.output_rows = getattr(query_job, "output_rows", None) | ||
| metadata.input_files = getattr(query_job, "input_files", None) | ||
| metadata.input_bytes = getattr(query_job, "input_bytes", None) | ||
| metadata.destination_table = ( | ||
| str(query_job.destination) | ||
| if getattr(query_job, "destination", None) | ||
| else None | ||
| ) | ||
| if getattr(query_job, "source_uris", None): | ||
| metadata.source_uris = list(query_job.source_uris) | ||
| if query_job.configuration and hasattr( | ||
| query_job.configuration, "source_format" | ||
| ): | ||
| metadata.source_format = query_job.configuration.source_format | ||
|
|
||
| return metadata | ||
|
|
||
| @classmethod | ||
| def from_row_iterator( | ||
| cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None | ||
| ) -> "JobMetadata": | ||
| query_text = getattr(row_iterator, "query", None) | ||
| if query_text and len(query_text) > 1024: | ||
| query_text = query_text[:1021] + "..." | ||
|
|
||
| job_id = getattr(row_iterator, "job_id", None) | ||
| job_url = None | ||
| if job_id: | ||
| project = getattr(row_iterator, "project", "") | ||
| location = getattr(row_iterator, "location", "") | ||
| job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults" | ||
|
|
||
| return cls( | ||
| job_id=job_id, | ||
| query_id=getattr(row_iterator, "query_id", None), | ||
| location=getattr(row_iterator, "location", None), | ||
| project=getattr(row_iterator, "project", None), | ||
| creation_time=getattr(row_iterator, "created", None), | ||
| start_time=getattr(row_iterator, "started", None), | ||
| end_time=getattr(row_iterator, "ended", None), | ||
| duration_seconds=exec_seconds, | ||
| status="DONE", | ||
| total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None), | ||
| total_slot_ms=getattr(row_iterator, "slot_millis", None), | ||
| job_type="query", | ||
| cached=getattr(row_iterator, "cache_hit", None), | ||
| query=query_text, | ||
| job_url=job_url, | ||
| ) | ||
|
|
||
|
|
||
| @dataclasses.dataclass | ||
| class ExecutionMetrics: | ||
| execution_count: int = 0 | ||
| slot_millis: int = 0 | ||
| bytes_processed: int = 0 | ||
| execution_secs: float = 0 | ||
| query_char_count: int = 0 | ||
| jobs: list[JobMetadata] = dataclasses.field(default_factory=list) | ||
|
|
||
| def count_job_stats( | ||
| self, | ||
| query_job: Optional[bq_job.QueryJob] = None, | ||
| query_job: Optional[Union[QueryJob, LoadJob]] = None, | ||
| row_iterator: Optional[bq_table.RowIterator] = None, | ||
| ): | ||
| if query_job is None: | ||
|
|
@@ -57,42 +173,71 @@ def count_job_stats( | |
| self.slot_millis += slot_millis | ||
| self.execution_secs += exec_seconds | ||
|
|
||
| elif query_job.configuration.dry_run: | ||
| query_char_count = len(query_job.query) | ||
| self.jobs.append( | ||
| JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds) | ||
| ) | ||
|
|
||
| elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run: | ||
| query_char_count = len(getattr(query_job, "query", "")) | ||
|
|
||
| # TODO(tswast): Pass None after making benchmark publishing robust to missing data. | ||
| bytes_processed = 0 | ||
| slot_millis = 0 | ||
| exec_seconds = 0.0 | ||
|
|
||
| elif (stats := get_performance_stats(query_job)) is not None: | ||
| query_char_count, bytes_processed, slot_millis, exec_seconds = stats | ||
| elif isinstance(query_job, bigquery.QueryJob): | ||
| if (stats := get_performance_stats(query_job)) is not None: | ||
| query_char_count, bytes_processed, slot_millis, exec_seconds = stats | ||
| self.execution_count += 1 | ||
| self.query_char_count += query_char_count or 0 | ||
| self.bytes_processed += bytes_processed or 0 | ||
| self.slot_millis += slot_millis or 0 | ||
| self.execution_secs += exec_seconds or 0 | ||
|
|
||
| metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds) | ||
| metadata.total_bytes_processed = bytes_processed | ||
| metadata.total_slot_ms = slot_millis | ||
| self.jobs.append(metadata) | ||
|
|
||
| else: | ||
| self.execution_count += 1 | ||
| self.query_char_count += query_char_count or 0 | ||
| self.bytes_processed += bytes_processed or 0 | ||
| self.slot_millis += slot_millis or 0 | ||
| self.execution_secs += exec_seconds or 0 | ||
| duration = ( | ||
| (query_job.ended - query_job.created).total_seconds() | ||
| if query_job.ended and query_job.created | ||
| else None | ||
| ) | ||
| self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration)) | ||
|
|
||
| # For pytest runs only, log information about the query job | ||
| # to a file in order to create a performance report. | ||
| if ( | ||
| isinstance(query_job, bigquery.QueryJob) | ||
| and not query_job.configuration.dry_run | ||
| ): | ||
| stats = get_performance_stats(query_job) | ||
| if stats: | ||
| write_stats_to_disk( | ||
| query_char_count=stats[0], | ||
| bytes_processed=stats[1], | ||
| slot_millis=stats[2], | ||
| exec_seconds=stats[3], | ||
| ) | ||
| elif row_iterator is not None: | ||
| bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0 | ||
| query_char_count = len(getattr(row_iterator, "query", "") or "") | ||
| slot_millis = getattr(row_iterator, "slot_millis", 0) or 0 | ||
| created = getattr(row_iterator, "created", None) | ||
| ended = getattr(row_iterator, "ended", None) | ||
| exec_seconds = ( | ||
| (ended - created).total_seconds() if created and ended else 0.0 | ||
| ) | ||
| write_stats_to_disk( | ||
| query_char_count=query_char_count, | ||
| bytes_processed=bytes_processed, | ||
| slot_millis=slot_millis, | ||
| exec_seconds=exec_seconds, | ||
| ) | ||
|
|
||
| else: | ||
| # TODO(tswast): Pass None after making benchmark publishing robust to missing data. | ||
| bytes_processed = 0 | ||
| query_char_count = 0 | ||
| slot_millis = 0 | ||
| exec_seconds = 0 | ||
|
|
||
| write_stats_to_disk( | ||
| query_char_count=query_char_count, | ||
| bytes_processed=bytes_processed, | ||
| slot_millis=slot_millis, | ||
| exec_seconds=exec_seconds, | ||
| ) | ||
|
|
||
|
|
||
| def get_performance_stats( | ||
| query_job: bigquery.QueryJob, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a static factory method to build this from an sdk query job object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! Added a from_job classmethod (and from_row_iterator) to handle building the metadata object directly from the jobs.