Skip to content
6 changes: 5 additions & 1 deletion google/cloud/logging/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

from google.cloud.logging_v2.handlers.app_engine import AppEngineHandler
from google.cloud.logging_v2.handlers.container_engine import ContainerEngineHandler
from google.cloud.logging_v2.handlers.structured_log import StructuredLogHandler
from google.cloud.logging_v2.handlers.handlers import CloudLoggingFilter
from google.cloud.logging_v2.handlers.handlers import CloudLoggingHandler
from google.cloud.logging_v2.handlers.handlers import setup_logging
from google.cloud.logging_v2.handlers.structured_log import (
AppendLabelLoggingAdapter,
StructuredLogHandler,
)

__all__ = [
"AppEngineHandler",
"AppendLabelLoggingAdapter",
"CloudLoggingFilter",
"CloudLoggingHandler",
"ContainerEngineHandler",
Expand Down
6 changes: 5 additions & 1 deletion google/cloud/logging_v2/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

from google.cloud.logging_v2.handlers.app_engine import AppEngineHandler
from google.cloud.logging_v2.handlers.container_engine import ContainerEngineHandler
from google.cloud.logging_v2.handlers.structured_log import StructuredLogHandler
from google.cloud.logging_v2.handlers.handlers import CloudLoggingHandler
from google.cloud.logging_v2.handlers.handlers import CloudLoggingFilter
from google.cloud.logging_v2.handlers.handlers import setup_logging
from google.cloud.logging_v2.handlers.structured_log import (
AppendLabelLoggingAdapter,
StructuredLogHandler,
)

__all__ = [
"AppEngineHandler",
"AppendLabelLoggingAdapter",
"CloudLoggingFilter",
"CloudLoggingHandler",
"ContainerEngineHandler",
Expand Down
81 changes: 79 additions & 2 deletions google/cloud/logging_v2/handlers/structured_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import logging
import logging.handlers

from google.cloud.logging_v2.handlers.handlers import CloudLoggingFilter
from google.cloud.logging_v2.handlers.handlers import _format_and_parse_message
import google.cloud.logging_v2
from google.cloud.logging_v2._instrumentation import _create_diagnostic_entry
from google.cloud.logging_v2.handlers.handlers import (
CloudLoggingFilter,
_format_and_parse_message,
)

GCP_FORMAT = (
"{%(_payload_str)s"
Expand Down Expand Up @@ -148,3 +150,78 @@ def emit_instrumentation_info(self):
struct_logger.setLevel(logging.INFO)
struct_logger.info(diagnostic_object.payload)
struct_logger.handlers.clear()


class AppendLabelLoggingAdapter(logging.LoggerAdapter):
"""A logging adapter that appends a set of constant labels to every log record.

This adapter ensures that specific key-value pairs are included in the 'labels'
dictionary of every log message, unless they are explicitly overridden in the
logging call.

Example:

.. code-block:: python

import logging
from google.cloud.logging_v2.handlers.structured_log import AppendLabelLoggingAdapter
from google.cloud.logging_v2.handlers.structured_log import StructuredLogHandler
logging.root.setLevel(logging.INFO)
logging.root.handlers = [StructuredLogHandler()]
first_adapter = AppendLabelLoggingAdapter(logging.root, {'a': 5, 'b': 6})
first_adapter.info('first info')
{
"message": "first info",
"severity": "INFO",
"logging.googleapis.com/labels": {"python_logger": "root", "a": 5, "b": 6}
[...]
}
# Adapters can be stacked
second_adapter=AppendLabelLoggingAdapter(first_adapter, {'hello': 'world'})
second_adapter.info('second info')
{
"message": "second info",
"severity": "INFO",
"logging.googleapis.com/labels": {"python_logger": "root", "hello": "world", "a": 5, "b": 6}
[...]
}
"""

def __init__(self, logger, append_labels):
"""
Args:
logger: The Logger or LoggerAdapter to wrap.
append_labels (dict): Labels to inject into every log record.
"""
# Ensure append_labels is always a dict to avoid attribute errors later
self.append_labels = append_labels or {}
super().__init__(logger, None)

def process(self, msg, kwargs):
# 1. Safely handle 'extra' and ensure it is a dictionary
# We copy to avoid mutating the original dict passed by the caller
extra = kwargs.get("extra")
if extra is None:
extra = {}
else:
extra = dict(extra)

# 2. Extract and copy existing labels
# In Google Cloud Structured Logging, 'labels' is the standard key
labels = extra.get("labels")
if not isinstance(labels, dict):
labels = {}
else:
labels = dict(labels)

# 3. Merging Logic
# Implementation choice: 'labels' from the call/inner adapter should
# take precedence over 'self.append_labels' to respect specific overrides
# thus the unpacking order is important.
labels = {**self.append_labels, **labels}

# 4. Re-insert into kwargs
extra["labels"] = labels
kwargs["extra"] = extra

return msg, kwargs
90 changes: 47 additions & 43 deletions tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import google.cloud.logging
from google.cloud._helpers import UTC
from google.cloud.logging_v2.handlers import CloudLoggingHandler
from google.cloud.logging_v2.handlers.transports import BackgroundThreadTransport
from google.cloud.logging_v2.handlers.transports import SyncTransport
from google.cloud.logging_v2 import client
from google.cloud.logging_v2.resource import Resource
Expand Down Expand Up @@ -138,6 +137,36 @@ def wrapped(*args, **kwargs):
return wrapped


def _shared_subprocess_worker(handler_name, log_message, cleanup_mode="close"):
"""
A top-level function to avoid duplication and PicklingErrors.
It creates its own Client to avoid SSL/fork corruption.
"""
import logging
from google.cloud.logging import Client
from google.cloud.logging.handlers import CloudLoggingHandler
from google.cloud.logging.handlers.transports import BackgroundThreadTransport

# 1. Create a fresh client inside the child process
local_client = Client()

# 2. Setup the handler and logger
handler = CloudLoggingHandler(
local_client, name=handler_name, transport=BackgroundThreadTransport
)
cloud_logger = logging.getLogger("subprocess_logger")
cloud_logger.addHandler(handler)

# 3. Perform the log
cloud_logger.warning(log_message)

# 4. Handle the specific cleanup requested by the test
if cleanup_mode == "close":
handler.close()
elif cleanup_mode == "flush":
local_client.flush_handlers()


class TestLogging(unittest.TestCase):
JSON_PAYLOAD = {
"message": "System test: test_log_struct",
Expand Down Expand Up @@ -723,67 +752,42 @@ def test_log_handler_otel_integration(self):
def test_log_handler_close(self):
import multiprocessing

ctx = multiprocessing.get_context("fork")
ctx = multiprocessing.get_context("spawn")

LOG_MESSAGE = "This is a test of handler.close before exiting."
LOGGER_NAME = "close-test"
handler_name = self._logger_name(LOGGER_NAME)
handler_name = self._logger_name("close-test")

# only create the logger to delete, hidden otherwise
# Setup for verification (parent uses its own client)
logger = Config.CLIENT.logger(handler_name)
self.to_delete.append(logger)

# Run a simulation of logging an entry then immediately shutting down.
# The .close() function before the process exits should prevent the
# thread shutdown error and let us log the message.
def subprocess_main():
# logger.delete and logger.list_entries work by filtering on log name, so we
# can create new objects with the same name and have the queries on the parent
# process still work.
handler = CloudLoggingHandler(
Config.CLIENT, name=handler_name, transport=BackgroundThreadTransport
)
cloud_logger = logging.getLogger(LOGGER_NAME)
cloud_logger.addHandler(handler)
cloud_logger.warning(LOG_MESSAGE)
handler.close()

proc = ctx.Process(target=subprocess_main)
proc = ctx.Process(
target=_shared_subprocess_worker, args=(handler_name, LOG_MESSAGE, "close")
)
proc.start()
proc.join()

entries = _list_entries(logger)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].payload, LOG_MESSAGE)

def test_log_client_flush_handlers(self):
def test_log_handler_flush(self):
import multiprocessing

ctx = multiprocessing.get_context("fork")
LOG_MESSAGE = "This is a test of client.flush_handlers before exiting."
LOGGER_NAME = "close-test"
handler_name = self._logger_name(LOGGER_NAME)
ctx = multiprocessing.get_context("spawn")

LOG_MESSAGE = "This is a test of client.flush_handlers."
handler_name = self._logger_name("flush-test")

# only create the logger to delete, hidden otherwise
logger = Config.CLIENT.logger(handler_name)
self.to_delete.append(logger)

# Run a simulation of logging an entry then immediately shutting down.
# The .close() function before the process exits should prevent the
# thread shutdown error and let us log the message.
def subprocess_main():
# logger.delete and logger.list_entries work by filtering on log name, so we
# can create new objects with the same name and have the queries on the parent
# process still work.
handler = CloudLoggingHandler(
Config.CLIENT, name=handler_name, transport=BackgroundThreadTransport
)
cloud_logger = logging.getLogger(LOGGER_NAME)
cloud_logger.addHandler(handler)
cloud_logger.warning(LOG_MESSAGE)
Config.CLIENT.flush_handlers()

proc = ctx.Process(target=subprocess_main)
proc = ctx.Process(
target=_shared_subprocess_worker, args=(handler_name, LOG_MESSAGE, "flush")
)
proc.start()
proc.join()

entries = _list_entries(logger)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].payload, LOG_MESSAGE)
Expand Down
93 changes: 93 additions & 0 deletions tests/unit/handlers/test_structured_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,3 +769,96 @@ def test_valid_instrumentation_info(self):
inst_source_dict,
"instrumentation payload not logged properly",
)

def test_append_labels_adapter(self):
import logging

import mock

from google.cloud.logging_v2.handlers.structured_log import (
AppendLabelLoggingAdapter,
)

logger = logging.getLogger("google.cloud.logging_v2.handlers.structured_log")
handler = self._make_one()
with mock.patch.object(handler, "emit_instrumentation_info"):
with mock.patch.object(logger, "_log") as mock_log:
logger.addHandler(handler)
logger.setLevel(logging.INFO)
adapted_logger = AppendLabelLoggingAdapter(
logger, append_labels={"service_id": 1, "another_value": "foo"}
)
adapted_logger.info("test message")
mock_log.assert_called_once()
self.assertEqual(
mock_log.call_args_list[0].kwargs,
{"extra": {"labels": {"service_id": 1, "another_value": "foo"}}},
)

def test_append_labels_adapter_override_defaults(self):
import logging

import mock

from google.cloud.logging_v2.handlers.structured_log import (
AppendLabelLoggingAdapter,
)

logger = logging.getLogger("google.cloud.logging_v2.handlers.structured_log")
handler = self._make_one()
with mock.patch.object(handler, "emit_instrumentation_info"):
with mock.patch.object(logger, "_log") as mock_log:
logger.addHandler(handler)
logger.setLevel(logging.INFO)
adapted_logger = AppendLabelLoggingAdapter(
logger, append_labels={"service_id": 1, "another_value": "foo"}
)
adapted_logger.info(
"test message", extra={"labels": {"another_value": "baz"}}
)
mock_log.assert_called_once()
# the default value was overridden
self.assertEqual(
mock_log.call_args_list[0].kwargs,
{"extra": {"labels": {"service_id": 1, "another_value": "baz"}}},
)

def test_append_labels_adapter_stacked(self):
import logging

import mock

from google.cloud.logging_v2.handlers.structured_log import (
AppendLabelLoggingAdapter,
)

logger = logging.getLogger("google.cloud.logging_v2.handlers.structured_log")
handler = self._make_one()
with mock.patch.object(handler, "emit_instrumentation_info"):
with mock.patch.object(logger, "_log") as mock_log:
logger.addHandler(handler)
logger.setLevel(logging.INFO)
adapted_logger = AppendLabelLoggingAdapter(
logger, append_labels={"service_id": 1, "another_value": "foo"}
)
twice_adapted_logger = AppendLabelLoggingAdapter(
adapted_logger,
# one fields is new, another was adapted already
append_labels={"new_field": "new_value", "another_value": "baz"},
)
twice_adapted_logger.info(
"test message", extra={"labels": {"another_value": "baz"}}
)
mock_log.assert_called_once()
self.assertEqual(
mock_log.call_args_list[0].kwargs,
{
"extra": {
"labels": {
"another_value": "baz", # value is changed by the second adapter
"new_field": "new_value", # introduced by the second adapter
"service_id": 1, # left as is from the first adapter configuration
}
}
},
)
Loading