diff --git a/google/cloud/logging/handlers/__init__.py b/google/cloud/logging/handlers/__init__.py index e27f8e673..8ccaacff8 100644 --- a/google/cloud/logging/handlers/__init__.py +++ b/google/cloud/logging/handlers/__init__.py @@ -16,13 +16,17 @@ from google.cloud.logging_v2.handlers.app_engine import AppEngineHandler from google.cloud.logging_v2.handlers.container_engine import ContainerEngineHandler -from google.cloud.logging_v2.handlers.structured_log import StructuredLogHandler from google.cloud.logging_v2.handlers.handlers import CloudLoggingFilter from google.cloud.logging_v2.handlers.handlers import CloudLoggingHandler from google.cloud.logging_v2.handlers.handlers import setup_logging +from google.cloud.logging_v2.handlers.structured_log import ( + AppendLabelLoggingAdapter, + StructuredLogHandler, +) __all__ = [ "AppEngineHandler", + "AppendLabelLoggingAdapter", "CloudLoggingFilter", "CloudLoggingHandler", "ContainerEngineHandler", diff --git a/google/cloud/logging_v2/handlers/__init__.py b/google/cloud/logging_v2/handlers/__init__.py index a1ed08b5e..fa18ddbfa 100644 --- a/google/cloud/logging_v2/handlers/__init__.py +++ b/google/cloud/logging_v2/handlers/__init__.py @@ -16,13 +16,17 @@ from google.cloud.logging_v2.handlers.app_engine import AppEngineHandler from google.cloud.logging_v2.handlers.container_engine import ContainerEngineHandler -from google.cloud.logging_v2.handlers.structured_log import StructuredLogHandler from google.cloud.logging_v2.handlers.handlers import CloudLoggingHandler from google.cloud.logging_v2.handlers.handlers import CloudLoggingFilter from google.cloud.logging_v2.handlers.handlers import setup_logging +from google.cloud.logging_v2.handlers.structured_log import ( + AppendLabelLoggingAdapter, + StructuredLogHandler, +) __all__ = [ "AppEngineHandler", + "AppendLabelLoggingAdapter", "CloudLoggingFilter", "CloudLoggingHandler", "ContainerEngineHandler", diff --git a/google/cloud/logging_v2/handlers/structured_log.py b/google/cloud/logging_v2/handlers/structured_log.py index dcba02c9c..8531f757b 100644 --- a/google/cloud/logging_v2/handlers/structured_log.py +++ b/google/cloud/logging_v2/handlers/structured_log.py @@ -19,10 +19,12 @@ import logging import logging.handlers -from google.cloud.logging_v2.handlers.handlers import CloudLoggingFilter -from google.cloud.logging_v2.handlers.handlers import _format_and_parse_message import google.cloud.logging_v2 from google.cloud.logging_v2._instrumentation import _create_diagnostic_entry +from google.cloud.logging_v2.handlers.handlers import ( + CloudLoggingFilter, + _format_and_parse_message, +) GCP_FORMAT = ( "{%(_payload_str)s" @@ -148,3 +150,78 @@ def emit_instrumentation_info(self): struct_logger.setLevel(logging.INFO) struct_logger.info(diagnostic_object.payload) struct_logger.handlers.clear() + + +class AppendLabelLoggingAdapter(logging.LoggerAdapter): + """A logging adapter that appends a set of constant labels to every log record. + + This adapter ensures that specific key-value pairs are included in the 'labels' + dictionary of every log message, unless they are explicitly overridden in the + logging call. + + Example: + + .. code-block:: python + + import logging + from google.cloud.logging_v2.handlers.structured_log import AppendLabelLoggingAdapter + from google.cloud.logging_v2.handlers.structured_log import StructuredLogHandler + logging.root.setLevel(logging.INFO) + logging.root.handlers = [StructuredLogHandler()] + first_adapter = AppendLabelLoggingAdapter(logging.root, {'a': 5, 'b': 6}) + first_adapter.info('first info') + { + "message": "first info", + "severity": "INFO", + "logging.googleapis.com/labels": {"python_logger": "root", "a": 5, "b": 6} + [...] + } + # Adapters can be stacked + second_adapter=AppendLabelLoggingAdapter(first_adapter, {'hello': 'world'}) + second_adapter.info('second info') + { + "message": "second info", + "severity": "INFO", + "logging.googleapis.com/labels": {"python_logger": "root", "hello": "world", "a": 5, "b": 6} + [...] + } + """ + + def __init__(self, logger, append_labels): + """ + Args: + logger: The Logger or LoggerAdapter to wrap. + append_labels (dict): Labels to inject into every log record. + """ + # Ensure append_labels is always a dict to avoid attribute errors later + self.append_labels = append_labels or {} + super().__init__(logger, None) + + def process(self, msg, kwargs): + # 1. Safely handle 'extra' and ensure it is a dictionary + # We copy to avoid mutating the original dict passed by the caller + extra = kwargs.get("extra") + if extra is None: + extra = {} + else: + extra = dict(extra) + + # 2. Extract and copy existing labels + # In Google Cloud Structured Logging, 'labels' is the standard key + labels = extra.get("labels") + if not isinstance(labels, dict): + labels = {} + else: + labels = dict(labels) + + # 3. Merging Logic + # Implementation choice: 'labels' from the call/inner adapter should + # take precedence over 'self.append_labels' to respect specific overrides + # thus the unpacking order is important. + labels = {**self.append_labels, **labels} + + # 4. Re-insert into kwargs + extra["labels"] = labels + kwargs["extra"] = extra + + return msg, kwargs diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 366ed343b..697861b2b 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -34,7 +34,6 @@ import google.cloud.logging from google.cloud._helpers import UTC from google.cloud.logging_v2.handlers import CloudLoggingHandler -from google.cloud.logging_v2.handlers.transports import BackgroundThreadTransport from google.cloud.logging_v2.handlers.transports import SyncTransport from google.cloud.logging_v2 import client from google.cloud.logging_v2.resource import Resource @@ -138,6 +137,36 @@ def wrapped(*args, **kwargs): return wrapped +def _shared_subprocess_worker(handler_name, log_message, cleanup_mode="close"): + """ + A top-level function to avoid duplication and PicklingErrors. + It creates its own Client to avoid SSL/fork corruption. + """ + import logging + from google.cloud.logging import Client + from google.cloud.logging.handlers import CloudLoggingHandler + from google.cloud.logging.handlers.transports import BackgroundThreadTransport + + # 1. Create a fresh client inside the child process + local_client = Client() + + # 2. Setup the handler and logger + handler = CloudLoggingHandler( + local_client, name=handler_name, transport=BackgroundThreadTransport + ) + cloud_logger = logging.getLogger("subprocess_logger") + cloud_logger.addHandler(handler) + + # 3. Perform the log + cloud_logger.warning(log_message) + + # 4. Handle the specific cleanup requested by the test + if cleanup_mode == "close": + handler.close() + elif cleanup_mode == "flush": + local_client.flush_handlers() + + class TestLogging(unittest.TestCase): JSON_PAYLOAD = { "message": "System test: test_log_struct", @@ -723,67 +752,42 @@ def test_log_handler_otel_integration(self): def test_log_handler_close(self): import multiprocessing - ctx = multiprocessing.get_context("fork") + ctx = multiprocessing.get_context("spawn") + LOG_MESSAGE = "This is a test of handler.close before exiting." - LOGGER_NAME = "close-test" - handler_name = self._logger_name(LOGGER_NAME) + handler_name = self._logger_name("close-test") - # only create the logger to delete, hidden otherwise + # Setup for verification (parent uses its own client) logger = Config.CLIENT.logger(handler_name) self.to_delete.append(logger) - # Run a simulation of logging an entry then immediately shutting down. - # The .close() function before the process exits should prevent the - # thread shutdown error and let us log the message. - def subprocess_main(): - # logger.delete and logger.list_entries work by filtering on log name, so we - # can create new objects with the same name and have the queries on the parent - # process still work. - handler = CloudLoggingHandler( - Config.CLIENT, name=handler_name, transport=BackgroundThreadTransport - ) - cloud_logger = logging.getLogger(LOGGER_NAME) - cloud_logger.addHandler(handler) - cloud_logger.warning(LOG_MESSAGE) - handler.close() - - proc = ctx.Process(target=subprocess_main) + proc = ctx.Process( + target=_shared_subprocess_worker, args=(handler_name, LOG_MESSAGE, "close") + ) proc.start() proc.join() + entries = _list_entries(logger) self.assertEqual(len(entries), 1) self.assertEqual(entries[0].payload, LOG_MESSAGE) - def test_log_client_flush_handlers(self): + def test_log_handler_flush(self): import multiprocessing - ctx = multiprocessing.get_context("fork") - LOG_MESSAGE = "This is a test of client.flush_handlers before exiting." - LOGGER_NAME = "close-test" - handler_name = self._logger_name(LOGGER_NAME) + ctx = multiprocessing.get_context("spawn") + + LOG_MESSAGE = "This is a test of client.flush_handlers." + handler_name = self._logger_name("flush-test") - # only create the logger to delete, hidden otherwise logger = Config.CLIENT.logger(handler_name) self.to_delete.append(logger) - # Run a simulation of logging an entry then immediately shutting down. - # The .close() function before the process exits should prevent the - # thread shutdown error and let us log the message. - def subprocess_main(): - # logger.delete and logger.list_entries work by filtering on log name, so we - # can create new objects with the same name and have the queries on the parent - # process still work. - handler = CloudLoggingHandler( - Config.CLIENT, name=handler_name, transport=BackgroundThreadTransport - ) - cloud_logger = logging.getLogger(LOGGER_NAME) - cloud_logger.addHandler(handler) - cloud_logger.warning(LOG_MESSAGE) - Config.CLIENT.flush_handlers() - - proc = ctx.Process(target=subprocess_main) + proc = ctx.Process( + target=_shared_subprocess_worker, args=(handler_name, LOG_MESSAGE, "flush") + ) proc.start() proc.join() + entries = _list_entries(logger) self.assertEqual(len(entries), 1) self.assertEqual(entries[0].payload, LOG_MESSAGE) diff --git a/tests/unit/handlers/test_structured_log.py b/tests/unit/handlers/test_structured_log.py index 908758749..6837b87c2 100644 --- a/tests/unit/handlers/test_structured_log.py +++ b/tests/unit/handlers/test_structured_log.py @@ -769,3 +769,96 @@ def test_valid_instrumentation_info(self): inst_source_dict, "instrumentation payload not logged properly", ) + + def test_append_labels_adapter(self): + import logging + + import mock + + from google.cloud.logging_v2.handlers.structured_log import ( + AppendLabelLoggingAdapter, + ) + + logger = logging.getLogger("google.cloud.logging_v2.handlers.structured_log") + handler = self._make_one() + with mock.patch.object(handler, "emit_instrumentation_info"): + with mock.patch.object(logger, "_log") as mock_log: + logger.addHandler(handler) + logger.setLevel(logging.INFO) + adapted_logger = AppendLabelLoggingAdapter( + logger, append_labels={"service_id": 1, "another_value": "foo"} + ) + adapted_logger.info("test message") + mock_log.assert_called_once() + self.assertEqual( + mock_log.call_args_list[0].kwargs, + {"extra": {"labels": {"service_id": 1, "another_value": "foo"}}}, + ) + + def test_append_labels_adapter_override_defaults(self): + import logging + + import mock + + from google.cloud.logging_v2.handlers.structured_log import ( + AppendLabelLoggingAdapter, + ) + + logger = logging.getLogger("google.cloud.logging_v2.handlers.structured_log") + handler = self._make_one() + with mock.patch.object(handler, "emit_instrumentation_info"): + with mock.patch.object(logger, "_log") as mock_log: + logger.addHandler(handler) + logger.setLevel(logging.INFO) + adapted_logger = AppendLabelLoggingAdapter( + logger, append_labels={"service_id": 1, "another_value": "foo"} + ) + adapted_logger.info( + "test message", extra={"labels": {"another_value": "baz"}} + ) + mock_log.assert_called_once() + # the default value was overridden + self.assertEqual( + mock_log.call_args_list[0].kwargs, + {"extra": {"labels": {"service_id": 1, "another_value": "baz"}}}, + ) + + def test_append_labels_adapter_stacked(self): + import logging + + import mock + + from google.cloud.logging_v2.handlers.structured_log import ( + AppendLabelLoggingAdapter, + ) + + logger = logging.getLogger("google.cloud.logging_v2.handlers.structured_log") + handler = self._make_one() + with mock.patch.object(handler, "emit_instrumentation_info"): + with mock.patch.object(logger, "_log") as mock_log: + logger.addHandler(handler) + logger.setLevel(logging.INFO) + adapted_logger = AppendLabelLoggingAdapter( + logger, append_labels={"service_id": 1, "another_value": "foo"} + ) + twice_adapted_logger = AppendLabelLoggingAdapter( + adapted_logger, + # one fields is new, another was adapted already + append_labels={"new_field": "new_value", "another_value": "baz"}, + ) + twice_adapted_logger.info( + "test message", extra={"labels": {"another_value": "baz"}} + ) + mock_log.assert_called_once() + self.assertEqual( + mock_log.call_args_list[0].kwargs, + { + "extra": { + "labels": { + "another_value": "baz", # value is changed by the second adapter + "new_field": "new_value", # introduced by the second adapter + "service_id": 1, # left as is from the first adapter configuration + } + } + }, + )