From 284dcc3303a9171e2b5e93d23adecab71614b6ae Mon Sep 17 00:00:00 2001 From: Lester Szeto Date: Thu, 16 Jan 2025 13:31:55 -0800 Subject: [PATCH] Feat: Added Metric Interceptor integration with Attempt metrics --- google/cloud/spanner_v1/client.py | 37 +++++ google/cloud/spanner_v1/metrics/constants.py | 6 + .../spanner_v1/metrics/metrics_exporter.py | 58 +++---- .../spanner_v1/metrics/metrics_interceptor.py | 146 ++++++++++++++++++ .../spanner_v1/metrics/metrics_tracer.py | 38 ++--- .../metrics/metrics_tracer_factory.py | 16 +- .../metrics/spanner_metrics_tracer_factory.py | 125 +++++++++++++++ .../spanner_v1/services/spanner/client.py | 2 + .../services/spanner/transports/base.py | 2 + .../services/spanner/transports/grpc.py | 9 ++ .../spanner/transports/grpc_asyncio.py | 2 + .../services/spanner/transports/rest.py | 3 +- setup.py | 2 + tests/mockserver_tests/test_tags.py | 14 +- tests/unit/gapic/spanner_v1/test_spanner.py | 13 ++ ...c_exporter.py => test_metrics_exporter.py} | 4 +- tests/unit/test_metrics_interceptor.py | 119 ++++++++++++++ .../test_spanner_metrics_tracer_factory.py | 50 ++++++ 18 files changed, 579 insertions(+), 67 deletions(-) create mode 100644 google/cloud/spanner_v1/metrics/metrics_interceptor.py create mode 100644 google/cloud/spanner_v1/metrics/spanner_metrics_tracer_factory.py rename tests/unit/{test_metric_exporter.py => test_metrics_exporter.py} (99%) create mode 100644 tests/unit/test_metrics_interceptor.py create mode 100644 tests/unit/test_spanner_metrics_tracer_factory.py diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index afe6264717..2cadf700dd 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -48,9 +48,27 @@ from google.cloud.spanner_v1._helpers import _merge_query_options from google.cloud.spanner_v1._helpers import _metadata_with_prefix from google.cloud.spanner_v1.instance import Instance +from google.cloud.spanner_v1.metrics.constants import ENABLE_SPANNER_METRICS_ENV_VAR +from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import ( + SpannerMetricsTracerFactory, +) +from google.cloud.spanner_v1.metrics.metrics_exporter import ( + CloudMonitoringMetricsExporter, +) + +try: + from opentelemetry import metrics + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + + HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True +except ImportError: # pragma: NO COVER + HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False + _CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__) EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST" +ENABLE_BUILTIN_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS" _EMULATOR_HOST_HTTP_SCHEME = ( "%s contains a http scheme. When used with a scheme it may cause gRPC's " "DNS resolver to endlessly attempt to resolve. %s is intended to be used " @@ -73,6 +91,10 @@ def _get_spanner_optimizer_statistics_package(): return os.getenv(OPTIMIZER_STATISITCS_PACKAGE_ENV_VAR, "") +def _get_spanner_enable_builtin_metrics(): + return os.getenv(ENABLE_SPANNER_METRICS_ENV_VAR) == "true" + + class Client(ClientWithProject): """Client for interacting with Cloud Spanner API. @@ -196,6 +218,21 @@ def __init__( ): warnings.warn(_EMULATOR_HOST_HTTP_SCHEME) + # Check flag to enable Spanner builtin metrics + if ( + _get_spanner_enable_builtin_metrics() + and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED + ): + meter_provider = metrics.NoOpMeterProvider() + if not _get_spanner_emulator_host(): + meter_provider = MeterProvider( + metric_readers=[ + PeriodicExportingMetricReader(CloudMonitoringMetricsExporter()) + ] + ) + metrics.set_meter_provider(meter_provider) + SpannerMetricsTracerFactory() + self._route_to_leader_enabled = route_to_leader_enabled self._directed_read_options = directed_read_options self._observability_options = observability_options diff --git a/google/cloud/spanner_v1/metrics/constants.py b/google/cloud/spanner_v1/metrics/constants.py index 5eca1fa83d..e0abb742d2 100644 --- a/google/cloud/spanner_v1/metrics/constants.py +++ b/google/cloud/spanner_v1/metrics/constants.py @@ -15,6 +15,12 @@ BUILT_IN_METRICS_METER_NAME = "gax-python" NATIVE_METRICS_PREFIX = "spanner.googleapis.com/internal/client" SPANNER_RESOURCE_TYPE = "spanner_instance_client" +SPANNER_SERVICE_NAME = "spanner-python" +GOOGLE_CLOUD_RESOURCE_KEY = "google-cloud-resource-prefix" +GOOGLE_CLOUD_REGION_KEY = "cloud.region" +GOOGLE_CLOUD_REGION_GLOBAL = "global" +SPANNER_METHOD_PREFIX = "/google.spanner.v1." +ENABLE_SPANNER_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS" # Monitored resource labels MONITORED_RES_LABEL_KEY_PROJECT = "project_id" diff --git a/google/cloud/spanner_v1/metrics/metrics_exporter.py b/google/cloud/spanner_v1/metrics/metrics_exporter.py index fb32985365..ea8aca7ff0 100644 --- a/google/cloud/spanner_v1/metrics/metrics_exporter.py +++ b/google/cloud/spanner_v1/metrics/metrics_exporter.py @@ -23,7 +23,7 @@ ) import logging -from typing import Optional, List, Union, NoReturn, Tuple +from typing import Optional, List, Union, NoReturn, Tuple, Dict import google.auth from google.api.distribution_pb2 import ( # pylint: disable=no-name-in-module @@ -39,10 +39,6 @@ MonitoredResource, ) -from google.cloud.monitoring_v3.services.metric_service.transports.grpc import ( - MetricServiceGrpcTransport, -) - # pylint: disable=no-name-in-module from google.protobuf.timestamp_pb2 import Timestamp from google.cloud.spanner_v1.gapic_version import __version__ @@ -60,12 +56,9 @@ Sum, ) from opentelemetry.sdk.resources import Resource - - HAS_OPENTELEMETRY_INSTALLED = True -except ImportError: # pragma: NO COVER - HAS_OPENTELEMETRY_INSTALLED = False - -try: + from google.cloud.monitoring_v3.services.metric_service.transports.grpc import ( + MetricServiceGrpcTransport, + ) from google.cloud.monitoring_v3 import ( CreateTimeSeriesRequest, MetricServiceClient, @@ -75,13 +68,10 @@ TypedValue, ) - HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True -except ImportError: - HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False - -HAS_DEPENDENCIES_INSTALLED = ( - HAS_OPENTELEMETRY_INSTALLED and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED -) + HAS_OPENTELEMETRY_INSTALLED = True +except ImportError: # pragma: NO COVER + HAS_OPENTELEMETRY_INSTALLED = False + MetricExporter = object logger = logging.getLogger(__name__) MAX_BATCH_WRITE = 200 @@ -120,7 +110,7 @@ class CloudMonitoringMetricsExporter(MetricExporter): def __init__( self, project_id: Optional[str] = None, - client: Optional[MetricServiceClient] = None, + client: Optional["MetricServiceClient"] = None, ): """Initialize a custom exporter to send metrics for the Spanner Service Metrics.""" # Default preferred_temporality is all CUMULATIVE so need to customize @@ -144,7 +134,7 @@ def __init__( self.project_id = project_id self.project_name = self.client.common_project_path(self.project_id) - def _batch_write(self, series: List[TimeSeries], timeout_millis: float) -> None: + def _batch_write(self, series: List["TimeSeries"], timeout_millis: float) -> None: """Cloud Monitoring allows writing up to 200 time series at once. :param series: ProtoBuf TimeSeries @@ -166,8 +156,8 @@ def _batch_write(self, series: List[TimeSeries], timeout_millis: float) -> None: @staticmethod def _resource_to_monitored_resource_pb( - resource: Resource, labels: any - ) -> MonitoredResource: + resource: "Resource", labels: Dict[str, str] + ) -> "MonitoredResource": """ Convert the resource to a Google Cloud Monitoring monitored resource. @@ -182,7 +172,7 @@ def _resource_to_monitored_resource_pb( return monitored_resource @staticmethod - def _to_metric_kind(metric: Metric) -> MetricDescriptor.MetricKind: + def _to_metric_kind(metric: "Metric") -> MetricDescriptor.MetricKind: """ Convert the metric to a Google Cloud Monitoring metric kind. @@ -210,7 +200,7 @@ def _to_metric_kind(metric: Metric) -> MetricDescriptor.MetricKind: @staticmethod def _extract_metric_labels( - data_point: Union[NumberDataPoint, HistogramDataPoint] + data_point: Union["NumberDataPoint", "HistogramDataPoint"] ) -> Tuple[dict, dict]: """ Extract the metric labels from the data point. @@ -233,8 +223,8 @@ def _extract_metric_labels( @staticmethod def _to_point( kind: "MetricDescriptor.MetricKind.V", - data_point: Union[NumberDataPoint, HistogramDataPoint], - ) -> Point: + data_point: Union["NumberDataPoint", "HistogramDataPoint"], + ) -> "Point": # Create a Google Cloud Monitoring data point value based on the OpenTelemetry metric data point type ## For histograms, we need to calculate the mean and bucket counts if isinstance(data_point, HistogramDataPoint): @@ -281,7 +271,7 @@ def _data_point_to_timeseries_pb( metric, monitored_resource, labels, - ) -> TimeSeries: + ) -> "TimeSeries": """ Convert the data point to a Google Cloud Monitoring time series. @@ -308,8 +298,8 @@ def _data_point_to_timeseries_pb( @staticmethod def _resource_metrics_to_timeseries_pb( - metrics_data: MetricsData, - ) -> List[TimeSeries]: + metrics_data: "MetricsData", + ) -> List["TimeSeries"]: """ Convert the metrics data to a list of Google Cloud Monitoring time series. @@ -346,10 +336,10 @@ def _resource_metrics_to_timeseries_pb( def export( self, - metrics_data: MetricsData, + metrics_data: "MetricsData", timeout_millis: float = 10_000, **kwargs, - ) -> MetricExportResult: + ) -> "MetricExportResult": """ Export the metrics data to Google Cloud Monitoring. @@ -357,7 +347,7 @@ def export( :param timeout_millis: timeout in milliseconds :return: MetricExportResult """ - if not HAS_DEPENDENCIES_INSTALLED: + if not HAS_OPENTELEMETRY_INSTALLED: logger.warning("Metric exporter called without dependencies installed.") return False @@ -370,8 +360,8 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: return True def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - """Not implemented.""" - pass + """Safely shuts down the exporter and closes all opened GRPC channels.""" + self.client.transport.close() def _timestamp_from_nanos(nanos: int) -> Timestamp: diff --git a/google/cloud/spanner_v1/metrics/metrics_interceptor.py b/google/cloud/spanner_v1/metrics/metrics_interceptor.py new file mode 100644 index 0000000000..963270ad84 --- /dev/null +++ b/google/cloud/spanner_v1/metrics/metrics_interceptor.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Interceptor for collecting Cloud Spanner metrics.""" + +from grpc_interceptor import ClientInterceptor +from .constants import ( + GOOGLE_CLOUD_RESOURCE_KEY, + SPANNER_METHOD_PREFIX, +) +from typing import Dict +from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory +import re + + +class MetricsInterceptor(ClientInterceptor): + """Interceptor that collects metrics for Cloud Spanner operations.""" + + @staticmethod + def _parse_resource_path(path: str) -> dict: + """Parse the resource path to extract project, instance and database. + + Args: + path (str): The resource path from the request + + Returns: + dict: Extracted resource components + """ + # Match paths like: + # projects/{project}/instances/{instance}/databases/{database}/sessions/{session} + # projects/{project}/instances/{instance}/databases/{database} + # projects/{project}/instances/{instance} + pattern = r"^projects/(?P[^/]+)(/instances/(?P[^/]+))?(/databases/(?P[^/]+))?(/sessions/(?P[^/]+))?.*$" + match = re.match(pattern, path) + if match: + return {k: v for k, v in match.groupdict().items() if v is not None} + return {} + + @staticmethod + def _extract_resource_from_path(metadata: Dict[str, str]) -> Dict[str, str]: + """ + Extracts resource information from the metadata based on the path. + + This method iterates through the metadata dictionary to find the first tuple containing the key 'google-cloud-resource-prefix'. It then extracts the path from this tuple and parses it to extract project, instance, and database information using the _parse_resource_path method. + + Args: + metadata (Dict[str, str]): A dictionary containing metadata information. + + Returns: + Dict[str, str]: A dictionary containing extracted project, instance, and database information. + """ + # Extract resource info from the first metadata tuple containing :path + path = next( + (value for key, value in metadata if key == GOOGLE_CLOUD_RESOURCE_KEY), "" + ) + + resources = MetricsInterceptor._parse_resource_path(path) + return resources + + @staticmethod + def _remove_prefix(s: str, prefix: str) -> str: + """ + This function removes the prefix from the given string. + + Args: + s (str): The string from which the prefix is to be removed. + prefix (str): The prefix to be removed from the string. + + Returns: + str: The string with the prefix removed. + + Note: + This function is used because the `removeprefix` method does not exist in Python 3.8. + """ + if s.startswith(prefix): + return s[len(prefix) :] + return s + + def _set_metrics_tracer_attributes(self, resources: Dict[str, str]) -> None: + """ + Sets the metric tracer attributes based on the provided resources. + + This method updates the current metric tracer's attributes with the project, instance, and database information extracted from the resources dictionary. If the current metric tracer is not set, the method does nothing. + + Args: + resources (Dict[str, str]): A dictionary containing project, instance, and database information. + """ + if SpannerMetricsTracerFactory.current_metrics_tracer is None: + return + + if resources: + if "project" in resources: + SpannerMetricsTracerFactory.current_metrics_tracer.set_project( + resources["project"] + ) + if "instance" in resources: + SpannerMetricsTracerFactory.current_metrics_tracer.set_instance( + resources["instance"] + ) + if "database" in resources: + SpannerMetricsTracerFactory.current_metrics_tracer.set_database( + resources["database"] + ) + + def intercept(self, invoked_method, request_or_iterator, call_details): + """Intercept gRPC calls to collect metrics. + + Args: + invoked_method: The RPC method + request_or_iterator: The RPC request + call_details: Details about the RPC call + + Returns: + The RPC response + """ + if SpannerMetricsTracerFactory.current_metrics_tracer is None: + return invoked_method(request_or_iterator, call_details) + + # Setup Metric Tracer attributes from call details + ## Extract Project / Instance / Databse from header information + resources = self._extract_resource_from_path(call_details.metadata) + self._set_metrics_tracer_attributes(resources) + + ## Format method to be be spanner. + method_name = self._remove_prefix( + call_details.method, SPANNER_METHOD_PREFIX + ).replace("/", ".") + SpannerMetricsTracerFactory.current_metrics_tracer.set_method(method_name) + + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start() + response = invoked_method(request_or_iterator, call_details) + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion() + + return response diff --git a/google/cloud/spanner_v1/metrics/metrics_tracer.py b/google/cloud/spanner_v1/metrics/metrics_tracer.py index 60525d6e4e..852133cbd4 100644 --- a/google/cloud/spanner_v1/metrics/metrics_tracer.py +++ b/google/cloud/spanner_v1/metrics/metrics_tracer.py @@ -177,10 +177,10 @@ class should not have any knowledge about the observability framework used for m """ _client_attributes: Dict[str, str] - _instrument_attempt_counter: Counter - _instrument_attempt_latency: Histogram - _instrument_operation_counter: Counter - _instrument_operation_latency: Histogram + _instrument_attempt_counter: "Counter" + _instrument_attempt_latency: "Histogram" + _instrument_operation_counter: "Counter" + _instrument_operation_latency: "Histogram" current_op: MetricOpTracer enabled: bool method: str @@ -188,10 +188,10 @@ class should not have any knowledge about the observability framework used for m def __init__( self, enabled: bool, - instrument_attempt_latency: Histogram, - instrument_attempt_counter: Counter, - instrument_operation_latency: Histogram, - instrument_operation_counter: Counter, + instrument_attempt_latency: "Histogram", + instrument_attempt_counter: "Counter", + instrument_operation_latency: "Histogram", + instrument_operation_counter: "Counter", client_attributes: Dict[str, str], ): """ @@ -251,7 +251,7 @@ def client_attributes(self) -> Dict[str, str]: return self._client_attributes @property - def instrument_attempt_counter(self) -> Counter: + def instrument_attempt_counter(self) -> "Counter": """ Return the instrument for counting attempts. @@ -264,7 +264,7 @@ def instrument_attempt_counter(self) -> Counter: return self._instrument_attempt_counter @property - def instrument_attempt_latency(self) -> Histogram: + def instrument_attempt_latency(self) -> "Histogram": """ Return the instrument for measuring attempt latency. @@ -277,7 +277,7 @@ def instrument_attempt_latency(self) -> Histogram: return self._instrument_attempt_latency @property - def instrument_operation_counter(self) -> Counter: + def instrument_operation_counter(self) -> "Counter": """ Return the instrument for counting operations. @@ -290,7 +290,7 @@ def instrument_operation_counter(self) -> Counter: return self._instrument_operation_counter @property - def instrument_operation_latency(self) -> Histogram: + def instrument_operation_latency(self) -> "Histogram": """ Return the instrument for measuring operation latency. @@ -394,9 +394,9 @@ def _create_operation_otel_attributes(self) -> dict: """ if not self.enabled: return {} - - self._client_attributes[METRIC_LABEL_KEY_STATUS] = self.current_op.status - return self._client_attributes + attributes = self._client_attributes.copy() + attributes[METRIC_LABEL_KEY_STATUS] = self.current_op.status + return attributes def _create_attempt_otel_attributes(self) -> dict: """ @@ -408,11 +408,13 @@ def _create_attempt_otel_attributes(self) -> dict: if not self.enabled: return {} - attributes = {} + attributes = self._client_attributes.copy() + # Short circuit out if we don't have an attempt - if self.current_op.current_attempt is not None: - attributes[METRIC_LABEL_KEY_STATUS] = self.current_op.current_attempt.status + if self.current_op.current_attempt is None: + return attributes + attributes[METRIC_LABEL_KEY_STATUS] = self.current_op.current_attempt.status return attributes def set_project(self, project: str) -> "MetricsTracer": diff --git a/google/cloud/spanner_v1/metrics/metrics_tracer_factory.py b/google/cloud/spanner_v1/metrics/metrics_tracer_factory.py index f7a4088019..ca8bf35407 100644 --- a/google/cloud/spanner_v1/metrics/metrics_tracer_factory.py +++ b/google/cloud/spanner_v1/metrics/metrics_tracer_factory.py @@ -50,26 +50,26 @@ class MetricsTracerFactory: """Factory class for creating MetricTracer instances. This class facilitates the creation of MetricTracer objects, which are responsible for collecting and tracing metrics.""" enabled: bool - _instrument_attempt_latency: Histogram - _instrument_attempt_counter: Counter - _instrument_operation_latency: Histogram - _instrument_operation_counter: Counter + _instrument_attempt_latency: "Histogram" + _instrument_attempt_counter: "Counter" + _instrument_operation_latency: "Histogram" + _instrument_operation_counter: "Counter" _client_attributes: Dict[str, str] @property - def instrument_attempt_latency(self) -> Histogram: + def instrument_attempt_latency(self) -> "Histogram": return self._instrument_attempt_latency @property - def instrument_attempt_counter(self) -> Counter: + def instrument_attempt_counter(self) -> "Counter": return self._instrument_attempt_counter @property - def instrument_operation_latency(self) -> Histogram: + def instrument_operation_latency(self) -> "Histogram": return self._instrument_operation_latency @property - def instrument_operation_counter(self) -> Counter: + def instrument_operation_counter(self) -> "Counter": return self._instrument_operation_counter def __init__(self, enabled: bool, service_name: str): diff --git a/google/cloud/spanner_v1/metrics/spanner_metrics_tracer_factory.py b/google/cloud/spanner_v1/metrics/spanner_metrics_tracer_factory.py new file mode 100644 index 0000000000..4bfe8b13dc --- /dev/null +++ b/google/cloud/spanner_v1/metrics/spanner_metrics_tracer_factory.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""This module provides a singleton factory for creating SpannerMetricsTracer instances.""" + +from .metrics_tracer_factory import MetricsTracerFactory +import os +from .constants import ( + SPANNER_SERVICE_NAME, + GOOGLE_CLOUD_REGION_KEY, + GOOGLE_CLOUD_REGION_GLOBAL, +) + +try: + from opentelemetry.resourcedetector import gcp_resource_detector + # Overwrite the requests timeout for the detector. + # This is necessary as the client will wait the full timeout if the + # code is not run in a GCP environment, with the location endpoints available. + gcp_resource_detector._TIMEOUT_SEC = 0.2 + + import mmh3 + + HAS_OPENTELEMETRY_INSTALLED = True +except ImportError: # pragma: NO COVER + HAS_OPENTELEMETRY_INSTALLED = False + +from .metrics_tracer import MetricsTracer +from google.cloud.spanner_v1 import __version__ +from uuid import uuid4 + + +class SpannerMetricsTracerFactory(MetricsTracerFactory): + """A factory for creating SpannerMetricsTracer instances.""" + + _metrics_tracer_factory: "SpannerMetricsTracerFactory" = None + current_metrics_tracer: MetricsTracer = None + + def __new__(cls, enabled: bool = True) -> "SpannerMetricsTracerFactory": + """Create a new instance of SpannerMetricsTracerFactory if it doesn't already exist.""" + if cls._metrics_tracer_factory is None: + cls._metrics_tracer_factory = MetricsTracerFactory( + enabled, SPANNER_SERVICE_NAME + ) + + client_uid = cls._generate_client_uid() + cls._metrics_tracer_factory.set_client_uid(client_uid) + cls._metrics_tracer_factory.set_instance_config(cls._get_instance_config()) + cls._metrics_tracer_factory.set_client_name(cls._get_client_name()) + cls._metrics_tracer_factory.set_client_hash( + cls._generate_client_hash(client_uid) + ) + cls._metrics_tracer_factory.set_location(cls._get_location()) + return cls._metrics_tracer_factory + + @staticmethod + def _generate_client_uid() -> str: + """Generate a client UID in the form of uuidv4@pid@hostname.""" + try: + hostname = os.uname()[1] + pid = str(os.getpid())[0:10] # Limit PID to 10 characters + uuid = uuid4() + return f"{uuid}@{pid}@{hostname}" + except Exception: + return "" + + @staticmethod + def _get_instance_config() -> str: + """Get the instance configuration.""" + # TODO: unknown until there's a good way to get it. + return "unknown" + + @staticmethod + def _get_client_name() -> str: + """Get the client name.""" + return f"{SPANNER_SERVICE_NAME}/{__version__}" + + @staticmethod + def _generate_client_hash(client_uid: str) -> str: + """ + Generate a 6-digit zero-padded lowercase hexadecimal hash using the 10 most significant bits of a 64-bit hash value. + + The primary purpose of this function is to generate a hash value for the `client_hash` + resource label using `client_uid` metric field. The range of values is chosen to be small + enough to keep the cardinality of the Resource targets under control. Note: If at later time + the range needs to be increased, it can be done by increasing the value of `kPrefixLength` to + up to 24 bits without changing the format of the returned value. + """ + if not client_uid: + return "000000" + hashed_client = mmh3.hash64(client_uid) + + # Join the hashes back together since mmh3 splits into high and low 32bits + full_hash = (hashed_client[0] << 32) | (hashed_client[1] & 0xFFFFFFFF) + unsigned_hash = full_hash & 0xFFFFFFFFFFFFFFFF + + k_prefix_length = 10 + sig_figs = unsigned_hash >> (64 - k_prefix_length) + + # Return as 6 digit zero padded hex string + return f"{sig_figs:06x}" + + @staticmethod + def _get_location() -> str: + """Get the location of the resource.""" + if not HAS_OPENTELEMETRY_INSTALLED: + return GOOGLE_CLOUD_REGION_GLOBAL + detector = gcp_resource_detector.GoogleCloudResourceDetector() + resources = detector.detect() + if GOOGLE_CLOUD_REGION_KEY not in resources.attributes: + return GOOGLE_CLOUD_REGION_GLOBAL + else: + return resources[GOOGLE_CLOUD_REGION_KEY] diff --git a/google/cloud/spanner_v1/services/spanner/client.py b/google/cloud/spanner_v1/services/spanner/client.py index 96b90bb21c..991a5ed292 100644 --- a/google/cloud/spanner_v1/services/spanner/client.py +++ b/google/cloud/spanner_v1/services/spanner/client.py @@ -62,6 +62,7 @@ from .transports.grpc import SpannerGrpcTransport from .transports.grpc_asyncio import SpannerGrpcAsyncIOTransport from .transports.rest import SpannerRestTransport +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor class SpannerClientMeta(type): @@ -705,6 +706,7 @@ def __init__( client_info=client_info, always_use_jwt_access=True, api_audience=self._client_options.api_audience, + metrics_interceptor=MetricsInterceptor(), ) def create_session( diff --git a/google/cloud/spanner_v1/services/spanner/transports/base.py b/google/cloud/spanner_v1/services/spanner/transports/base.py index 14c8e8d02f..8fa85af24d 100644 --- a/google/cloud/spanner_v1/services/spanner/transports/base.py +++ b/google/cloud/spanner_v1/services/spanner/transports/base.py @@ -30,6 +30,7 @@ from google.cloud.spanner_v1.types import result_set from google.cloud.spanner_v1.types import spanner from google.cloud.spanner_v1.types import transaction +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor from google.protobuf import empty_pb2 # type: ignore DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( @@ -58,6 +59,7 @@ def __init__( client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, always_use_jwt_access: Optional[bool] = False, api_audience: Optional[str] = None, + metrics_interceptor: Optional[MetricsInterceptor] = None, **kwargs, ) -> None: """Instantiate the transport. diff --git a/google/cloud/spanner_v1/services/spanner/transports/grpc.py b/google/cloud/spanner_v1/services/spanner/transports/grpc.py index a2afa32174..9ede3bea8e 100644 --- a/google/cloud/spanner_v1/services/spanner/transports/grpc.py +++ b/google/cloud/spanner_v1/services/spanner/transports/grpc.py @@ -28,6 +28,8 @@ from google.cloud.spanner_v1.types import result_set from google.cloud.spanner_v1.types import spanner from google.cloud.spanner_v1.types import transaction + +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor from google.protobuf import empty_pb2 # type: ignore from .base import SpannerTransport, DEFAULT_CLIENT_INFO @@ -66,6 +68,7 @@ def __init__( client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, always_use_jwt_access: Optional[bool] = False, api_audience: Optional[str] = None, + metrics_interceptor: Optional[MetricsInterceptor] = None, ) -> None: """Instantiate the transport. @@ -187,6 +190,12 @@ def __init__( ], ) + # Wrap the gRPC channel with the metric interceptor + if metrics_interceptor is not None: + self._grpc_channel = grpc.intercept_channel( + self._grpc_channel, metrics_interceptor + ) + # Wrap messages. This must be done after self._grpc_channel exists self._prep_wrapped_messages(client_info) diff --git a/google/cloud/spanner_v1/services/spanner/transports/grpc_asyncio.py b/google/cloud/spanner_v1/services/spanner/transports/grpc_asyncio.py index 9092ccf61d..b0ea754f38 100644 --- a/google/cloud/spanner_v1/services/spanner/transports/grpc_asyncio.py +++ b/google/cloud/spanner_v1/services/spanner/transports/grpc_asyncio.py @@ -31,6 +31,7 @@ from google.cloud.spanner_v1.types import result_set from google.cloud.spanner_v1.types import spanner from google.cloud.spanner_v1.types import transaction +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor from google.protobuf import empty_pb2 # type: ignore from .base import SpannerTransport, DEFAULT_CLIENT_INFO from .grpc import SpannerGrpcTransport @@ -113,6 +114,7 @@ def __init__( client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, always_use_jwt_access: Optional[bool] = False, api_audience: Optional[str] = None, + metrics_interceptor: Optional[MetricsInterceptor] = None, ) -> None: """Instantiate the transport. diff --git a/google/cloud/spanner_v1/services/spanner/transports/rest.py b/google/cloud/spanner_v1/services/spanner/transports/rest.py index 6ca5e9eeed..798e71f0f5 100644 --- a/google/cloud/spanner_v1/services/spanner/transports/rest.py +++ b/google/cloud/spanner_v1/services/spanner/transports/rest.py @@ -35,9 +35,9 @@ from google.cloud.spanner_v1.types import result_set from google.cloud.spanner_v1.types import spanner from google.cloud.spanner_v1.types import transaction +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor from google.protobuf import empty_pb2 # type: ignore - from .rest_base import _BaseSpannerRestTransport from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO @@ -546,6 +546,7 @@ def __init__( url_scheme: str = "https", interceptor: Optional[SpannerRestInterceptor] = None, api_audience: Optional[str] = None, + metrics_interceptor: Optional[MetricsInterceptor] = None, ) -> None: """Instantiate the transport. diff --git a/setup.py b/setup.py index 619607b794..d9a0dc5f0e 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,9 @@ "opentelemetry-api >= 1.22.0", "opentelemetry-sdk >= 1.22.0", "opentelemetry-semantic-conventions >= 0.43b0", + "opentelemetry-resourcedetector-gcp >= 1.8.0a0", "google-cloud-monitoring >= 2.16.0", + "mmh3 >= 5.0.1 ", ], "libcst": "libcst >= 0.2.5", } diff --git a/tests/mockserver_tests/test_tags.py b/tests/mockserver_tests/test_tags.py index c84d69b7bd..f44a9fb9a9 100644 --- a/tests/mockserver_tests/test_tags.py +++ b/tests/mockserver_tests/test_tags.py @@ -181,10 +181,16 @@ def test_request_tag_is_cleared(self): # This query will not have a request tag. cursor.execute("select name from singers") requests = self.spanner_service.requests - self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) - self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) - self.assertEqual("my_tag", requests[1].request_options.request_tag) - self.assertEqual("", requests[2].request_options.request_tag) + + # Filter for SQL requests calls + sql_requests = [ + request for request in requests if isinstance(request, ExecuteSqlRequest) + ] + + self.assertTrue(isinstance(sql_requests[0], ExecuteSqlRequest)) + self.assertTrue(isinstance(sql_requests[1], ExecuteSqlRequest)) + self.assertEqual("my_tag", sql_requests[0].request_options.request_tag) + self.assertEqual("", sql_requests[1].request_options.request_tag) def _execute_and_verify_select_singers( self, connection: Connection, request_tag: str = "", transaction_tag: str = "" diff --git a/tests/unit/gapic/spanner_v1/test_spanner.py b/tests/unit/gapic/spanner_v1/test_spanner.py index a1da7983a0..a331a8850d 100644 --- a/tests/unit/gapic/spanner_v1/test_spanner.py +++ b/tests/unit/gapic/spanner_v1/test_spanner.py @@ -514,6 +514,7 @@ def test_spanner_client_client_options(client_class, transport_class, transport_ client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case api_endpoint is not provided and GOOGLE_API_USE_MTLS_ENDPOINT is @@ -534,6 +535,7 @@ def test_spanner_client_client_options(client_class, transport_class, transport_ client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case api_endpoint is not provided and GOOGLE_API_USE_MTLS_ENDPOINT is @@ -552,6 +554,7 @@ def test_spanner_client_client_options(client_class, transport_class, transport_ client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case api_endpoint is not provided and GOOGLE_API_USE_MTLS_ENDPOINT has @@ -592,6 +595,7 @@ def test_spanner_client_client_options(client_class, transport_class, transport_ client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case api_endpoint is provided options = client_options.ClientOptions( @@ -612,6 +616,7 @@ def test_spanner_client_client_options(client_class, transport_class, transport_ client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience="https://language.googleapis.com", + metrics_interceptor=mock.ANY, ) @@ -684,6 +689,7 @@ def test_spanner_client_mtls_env_auto( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case ADC client cert is provided. Whether client cert is used depends on @@ -721,6 +727,7 @@ def test_spanner_client_mtls_env_auto( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case client_cert_source and ADC client cert are not provided. @@ -746,6 +753,7 @@ def test_spanner_client_mtls_env_auto( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) @@ -961,6 +969,7 @@ def test_spanner_client_client_options_scopes( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) @@ -998,6 +1007,7 @@ def test_spanner_client_client_options_credentials_file( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) @@ -1017,6 +1027,7 @@ def test_spanner_client_client_options_from_dict(): client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) @@ -1053,6 +1064,7 @@ def test_spanner_client_create_channel_credentials_file( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # test that the credentials from file are saved and used as the credentials. @@ -12600,4 +12612,5 @@ def test_api_key_credentials(client_class, transport_class): client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) diff --git a/tests/unit/test_metric_exporter.py b/tests/unit/test_metrics_exporter.py similarity index 99% rename from tests/unit/test_metric_exporter.py rename to tests/unit/test_metrics_exporter.py index 08ae9ecf21..92fe5638b1 100644 --- a/tests/unit/test_metric_exporter.py +++ b/tests/unit/test_metrics_exporter.py @@ -1,4 +1,4 @@ -# Copyright 2016 Google LLC All rights reserved. +# Copyright 2016 Google LLC All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -333,7 +333,7 @@ def create_tsr_side_effect(name, time_series): self.assertEqual(len(mockClient.create_service_time_series.mock_calls), 2) @patch( - "google.cloud.spanner_v1.metrics.metrics_exporter.HAS_DEPENDENCIES_INSTALLED", + "google.cloud.spanner_v1.metrics.metrics_exporter.HAS_OPENTELEMETRY_INSTALLED", False, ) def test_export_early_exit_if_extras_not_installed(self): diff --git a/tests/unit/test_metrics_interceptor.py b/tests/unit/test_metrics_interceptor.py new file mode 100644 index 0000000000..8a913af25d --- /dev/null +++ b/tests/unit/test_metrics_interceptor.py @@ -0,0 +1,119 @@ +import pytest +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor +from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import ( + SpannerMetricsTracerFactory, +) +from unittest.mock import MagicMock + + +@pytest.fixture +def interceptor(): + return MetricsInterceptor() + + +def test_parse_resource_path_valid(interceptor): + path = "projects/my_project/instances/my_instance/databases/my_database" + expected = { + "project": "my_project", + "instance": "my_instance", + "database": "my_database", + } + assert interceptor._parse_resource_path(path) == expected + + +def test_parse_resource_path_invalid(interceptor): + path = "invalid/path" + expected = {} + assert interceptor._parse_resource_path(path) == expected + + +def test_extract_resource_from_path(interceptor): + metadata = [ + ( + "google-cloud-resource-prefix", + "projects/my_project/instances/my_instance/databases/my_database", + ) + ] + expected = { + "project": "my_project", + "instance": "my_instance", + "database": "my_database", + } + assert interceptor._extract_resource_from_path(metadata) == expected + + +def test_set_metrics_tracer_attributes(interceptor): + SpannerMetricsTracerFactory.current_metrics_tracer = MockMetricTracer() + resources = { + "project": "my_project", + "instance": "my_instance", + "database": "my_database", + } + + interceptor._set_metrics_tracer_attributes(resources) + assert SpannerMetricsTracerFactory.current_metrics_tracer.project == "my_project" + assert SpannerMetricsTracerFactory.current_metrics_tracer.instance == "my_instance" + assert SpannerMetricsTracerFactory.current_metrics_tracer.database == "my_database" + + +def test_intercept_without_tracer(interceptor): + mock_invoked_method = MagicMock(return_value="response") + mock_details = MagicMock(metadata={}) + response = interceptor.intercept(mock_invoked_method, "request", mock_details) + + assert response == "response" + mock_invoked_method.assert_called_once_with("request", mock_details) + + +def test_intercept_with_tracer(interceptor): + SpannerMetricsTracerFactory.current_metrics_tracer = MockMetricTracer() + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start = ( + MagicMock() + ) + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion = ( + MagicMock() + ) + + mock_invoked_method = MagicMock(return_value="response") + call_details = MagicMock( + method="spanner.someMethod", + metadata=[ + ( + "google-cloud-resource-prefix", + "projects/my_project/instances/my_instance/databases/my_database", + ) + ], + ) + + response = interceptor.intercept(mock_invoked_method, "request", call_details) + + assert response == "response" + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start.assert_called_once() + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion.assert_called_once() + mock_invoked_method.assert_called_once_with("request", call_details) + + +class MockMetricTracer: + def __init__(self): + self.project = None + self.instance = None + self.database = None + self.method = None + + def set_project(self, project): + self.project = project + + def set_instance(self, instance): + self.instance = instance + + def set_database(self, database): + self.database = database + + def set_method(self, method): + self.method = method + + def record_attempt_start(self): + pass + + def record_attempt_completion(self): + pass diff --git a/tests/unit/test_spanner_metrics_tracer_factory.py b/tests/unit/test_spanner_metrics_tracer_factory.py new file mode 100644 index 0000000000..8ee4d53d3d --- /dev/null +++ b/tests/unit/test_spanner_metrics_tracer_factory.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import ( + SpannerMetricsTracerFactory, +) + + +class TestSpannerMetricsTracerFactory: + def test_new_instance_creation(self): + factory1 = SpannerMetricsTracerFactory(enabled=True) + factory2 = SpannerMetricsTracerFactory(enabled=True) + assert factory1 is factory2 # Should return the same instance + + def test_generate_client_uid_format(self): + client_uid = SpannerMetricsTracerFactory._generate_client_uid() + assert isinstance(client_uid, str) + assert len(client_uid.split("@")) == 3 # Should contain uuid, pid, and hostname + + def test_generate_client_hash(self): + client_uid = "123e4567-e89b-12d3-a456-426614174000@1234@hostname" + client_hash = SpannerMetricsTracerFactory._generate_client_hash(client_uid) + assert isinstance(client_hash, str) + assert len(client_hash) == 6 # Should be a 6-digit hex string + + def test_get_instance_config(self): + instance_config = SpannerMetricsTracerFactory._get_instance_config() + assert instance_config == "unknown" # As per the current implementation + + def test_get_client_name(self): + client_name = SpannerMetricsTracerFactory._get_client_name() + assert isinstance(client_name, str) + assert "spanner-python" in client_name + + def test_get_location(self): + location = SpannerMetricsTracerFactory._get_location() + assert isinstance(location, str) + assert location # Simply asserting for non empty as this can change depending on the instance this test runs in.