Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,27 @@
from google.cloud.spanner_v1._helpers import _merge_query_options
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1.instance import Instance
from google.cloud.spanner_v1.metrics.constants import ENABLE_SPANNER_METRICS_ENV_VAR
from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import (
SpannerMetricsTracerFactory,
)
from google.cloud.spanner_v1.metrics.metrics_exporter import (
CloudMonitoringMetricsExporter,
)

try:
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader

HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True
except ImportError: # pragma: NO COVER
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False


_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
ENABLE_BUILTIN_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS"
_EMULATOR_HOST_HTTP_SCHEME = (
"%s contains a http scheme. When used with a scheme it may cause gRPC's "
"DNS resolver to endlessly attempt to resolve. %s is intended to be used "
Expand All @@ -73,6 +91,10 @@ def _get_spanner_optimizer_statistics_package():
return os.getenv(OPTIMIZER_STATISITCS_PACKAGE_ENV_VAR, "")


def _get_spanner_enable_builtin_metrics():
return os.getenv(ENABLE_SPANNER_METRICS_ENV_VAR) == "true"


class Client(ClientWithProject):
"""Client for interacting with Cloud Spanner API.

Expand Down Expand Up @@ -196,6 +218,21 @@ def __init__(
):
warnings.warn(_EMULATOR_HOST_HTTP_SCHEME)

# Check flag to enable Spanner builtin metrics
if (
_get_spanner_enable_builtin_metrics()
and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED
):
meter_provider = metrics.NoOpMeterProvider()
if not _get_spanner_emulator_host():
meter_provider = MeterProvider(
metric_readers=[
PeriodicExportingMetricReader(CloudMonitoringMetricsExporter())
]
)
metrics.set_meter_provider(meter_provider)
SpannerMetricsTracerFactory()

self._route_to_leader_enabled = route_to_leader_enabled
self._directed_read_options = directed_read_options
self._observability_options = observability_options
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/spanner_v1/metrics/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
BUILT_IN_METRICS_METER_NAME = "gax-python"
NATIVE_METRICS_PREFIX = "spanner.googleapis.com/internal/client"
SPANNER_RESOURCE_TYPE = "spanner_instance_client"
SPANNER_SERVICE_NAME = "spanner-python"
GOOGLE_CLOUD_RESOURCE_KEY = "google-cloud-resource-prefix"
GOOGLE_CLOUD_REGION_KEY = "cloud.region"
GOOGLE_CLOUD_REGION_GLOBAL = "global"
SPANNER_METHOD_PREFIX = "/google.spanner.v1."
ENABLE_SPANNER_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS"

# Monitored resource labels
MONITORED_RES_LABEL_KEY_PROJECT = "project_id"
Expand Down
58 changes: 24 additions & 34 deletions google/cloud/spanner_v1/metrics/metrics_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)

import logging
from typing import Optional, List, Union, NoReturn, Tuple
from typing import Optional, List, Union, NoReturn, Tuple, Dict

import google.auth
from google.api.distribution_pb2 import ( # pylint: disable=no-name-in-module
Expand All @@ -39,10 +39,6 @@
MonitoredResource,
)

from google.cloud.monitoring_v3.services.metric_service.transports.grpc import (
MetricServiceGrpcTransport,
)

# pylint: disable=no-name-in-module
from google.protobuf.timestamp_pb2 import Timestamp
from google.cloud.spanner_v1.gapic_version import __version__
Expand All @@ -60,12 +56,9 @@
Sum,
)
from opentelemetry.sdk.resources import Resource

HAS_OPENTELEMETRY_INSTALLED = True
except ImportError: # pragma: NO COVER
HAS_OPENTELEMETRY_INSTALLED = False

try:
from google.cloud.monitoring_v3.services.metric_service.transports.grpc import (
MetricServiceGrpcTransport,
)
from google.cloud.monitoring_v3 import (
CreateTimeSeriesRequest,
MetricServiceClient,
Expand All @@ -75,13 +68,10 @@
TypedValue,
)

HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True
except ImportError:
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False

HAS_DEPENDENCIES_INSTALLED = (
HAS_OPENTELEMETRY_INSTALLED and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED
)
HAS_OPENTELEMETRY_INSTALLED = True
except ImportError: # pragma: NO COVER
HAS_OPENTELEMETRY_INSTALLED = False
MetricExporter = object

logger = logging.getLogger(__name__)
MAX_BATCH_WRITE = 200
Expand Down Expand Up @@ -120,7 +110,7 @@ class CloudMonitoringMetricsExporter(MetricExporter):
def __init__(
self,
project_id: Optional[str] = None,
client: Optional[MetricServiceClient] = None,
client: Optional["MetricServiceClient"] = None,
):
"""Initialize a custom exporter to send metrics for the Spanner Service Metrics."""
# Default preferred_temporality is all CUMULATIVE so need to customize
Expand All @@ -144,7 +134,7 @@ def __init__(
self.project_id = project_id
self.project_name = self.client.common_project_path(self.project_id)

def _batch_write(self, series: List[TimeSeries], timeout_millis: float) -> None:
def _batch_write(self, series: List["TimeSeries"], timeout_millis: float) -> None:
"""Cloud Monitoring allows writing up to 200 time series at once.

:param series: ProtoBuf TimeSeries
Expand All @@ -166,8 +156,8 @@ def _batch_write(self, series: List[TimeSeries], timeout_millis: float) -> None:

@staticmethod
def _resource_to_monitored_resource_pb(
resource: Resource, labels: any
) -> MonitoredResource:
resource: "Resource", labels: Dict[str, str]
) -> "MonitoredResource":
"""
Convert the resource to a Google Cloud Monitoring monitored resource.

Expand All @@ -182,7 +172,7 @@ def _resource_to_monitored_resource_pb(
return monitored_resource

@staticmethod
def _to_metric_kind(metric: Metric) -> MetricDescriptor.MetricKind:
def _to_metric_kind(metric: "Metric") -> MetricDescriptor.MetricKind:
"""
Convert the metric to a Google Cloud Monitoring metric kind.

Expand Down Expand Up @@ -210,7 +200,7 @@ def _to_metric_kind(metric: Metric) -> MetricDescriptor.MetricKind:

@staticmethod
def _extract_metric_labels(
data_point: Union[NumberDataPoint, HistogramDataPoint]
data_point: Union["NumberDataPoint", "HistogramDataPoint"]
) -> Tuple[dict, dict]:
"""
Extract the metric labels from the data point.
Expand All @@ -233,8 +223,8 @@ def _extract_metric_labels(
@staticmethod
def _to_point(
kind: "MetricDescriptor.MetricKind.V",
data_point: Union[NumberDataPoint, HistogramDataPoint],
) -> Point:
data_point: Union["NumberDataPoint", "HistogramDataPoint"],
) -> "Point":
# Create a Google Cloud Monitoring data point value based on the OpenTelemetry metric data point type
## For histograms, we need to calculate the mean and bucket counts
if isinstance(data_point, HistogramDataPoint):
Expand Down Expand Up @@ -281,7 +271,7 @@ def _data_point_to_timeseries_pb(
metric,
monitored_resource,
labels,
) -> TimeSeries:
) -> "TimeSeries":
"""
Convert the data point to a Google Cloud Monitoring time series.

Expand All @@ -308,8 +298,8 @@ def _data_point_to_timeseries_pb(

@staticmethod
def _resource_metrics_to_timeseries_pb(
metrics_data: MetricsData,
) -> List[TimeSeries]:
metrics_data: "MetricsData",
) -> List["TimeSeries"]:
"""
Convert the metrics data to a list of Google Cloud Monitoring time series.

Expand Down Expand Up @@ -346,18 +336,18 @@ def _resource_metrics_to_timeseries_pb(

def export(
self,
metrics_data: MetricsData,
metrics_data: "MetricsData",
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
) -> "MetricExportResult":
"""
Export the metrics data to Google Cloud Monitoring.

:param metrics_data: OpenTelemetry metrics data
:param timeout_millis: timeout in milliseconds
:return: MetricExportResult
"""
if not HAS_DEPENDENCIES_INSTALLED:
if not HAS_OPENTELEMETRY_INSTALLED:
logger.warning("Metric exporter called without dependencies installed.")
return False

Expand All @@ -370,8 +360,8 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:
return True

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
"""Not implemented."""
pass
"""Safely shuts down the exporter and closes all opened GRPC channels."""
self.client.transport.close()


def _timestamp_from_nanos(nanos: int) -> Timestamp:
Expand Down
146 changes: 146 additions & 0 deletions google/cloud/spanner_v1/metrics/metrics_interceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interceptor for collecting Cloud Spanner metrics."""

from grpc_interceptor import ClientInterceptor
from .constants import (
GOOGLE_CLOUD_RESOURCE_KEY,
SPANNER_METHOD_PREFIX,
)
from typing import Dict
from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory
import re


class MetricsInterceptor(ClientInterceptor):
"""Interceptor that collects metrics for Cloud Spanner operations."""

@staticmethod
def _parse_resource_path(path: str) -> dict:
"""Parse the resource path to extract project, instance and database.

Args:
path (str): The resource path from the request

Returns:
dict: Extracted resource components
"""
# Match paths like:
# projects/{project}/instances/{instance}/databases/{database}/sessions/{session}
# projects/{project}/instances/{instance}/databases/{database}
# projects/{project}/instances/{instance}
pattern = r"^projects/(?P<project>[^/]+)(/instances/(?P<instance>[^/]+))?(/databases/(?P<database>[^/]+))?(/sessions/(?P<session>[^/]+))?.*$"
match = re.match(pattern, path)
if match:
return {k: v for k, v in match.groupdict().items() if v is not None}
return {}

@staticmethod
def _extract_resource_from_path(metadata: Dict[str, str]) -> Dict[str, str]:
"""
Extracts resource information from the metadata based on the path.

This method iterates through the metadata dictionary to find the first tuple containing the key 'google-cloud-resource-prefix'. It then extracts the path from this tuple and parses it to extract project, instance, and database information using the _parse_resource_path method.

Args:
metadata (Dict[str, str]): A dictionary containing metadata information.

Returns:
Dict[str, str]: A dictionary containing extracted project, instance, and database information.
"""
# Extract resource info from the first metadata tuple containing :path
path = next(
(value for key, value in metadata if key == GOOGLE_CLOUD_RESOURCE_KEY), ""
)

resources = MetricsInterceptor._parse_resource_path(path)
return resources

@staticmethod
def _remove_prefix(s: str, prefix: str) -> str:
"""
This function removes the prefix from the given string.

Args:
s (str): The string from which the prefix is to be removed.
prefix (str): The prefix to be removed from the string.

Returns:
str: The string with the prefix removed.

Note:
This function is used because the `removeprefix` method does not exist in Python 3.8.
"""
if s.startswith(prefix):
return s[len(prefix) :]
return s

def _set_metrics_tracer_attributes(self, resources: Dict[str, str]) -> None:
"""
Sets the metric tracer attributes based on the provided resources.

This method updates the current metric tracer's attributes with the project, instance, and database information extracted from the resources dictionary. If the current metric tracer is not set, the method does nothing.

Args:
resources (Dict[str, str]): A dictionary containing project, instance, and database information.
"""
if SpannerMetricsTracerFactory.current_metrics_tracer is None:
return

if resources:
if "project" in resources:
SpannerMetricsTracerFactory.current_metrics_tracer.set_project(
resources["project"]
)
if "instance" in resources:
SpannerMetricsTracerFactory.current_metrics_tracer.set_instance(
resources["instance"]
)
if "database" in resources:
SpannerMetricsTracerFactory.current_metrics_tracer.set_database(
resources["database"]
)

def intercept(self, invoked_method, request_or_iterator, call_details):
"""Intercept gRPC calls to collect metrics.

Args:
invoked_method: The RPC method
request_or_iterator: The RPC request
call_details: Details about the RPC call

Returns:
The RPC response
"""
if SpannerMetricsTracerFactory.current_metrics_tracer is None:
return invoked_method(request_or_iterator, call_details)

# Setup Metric Tracer attributes from call details
## Extract Project / Instance / Databse from header information
resources = self._extract_resource_from_path(call_details.metadata)
self._set_metrics_tracer_attributes(resources)

## Format method to be be spanner.<method name>
method_name = self._remove_prefix(
call_details.method, SPANNER_METHOD_PREFIX
).replace("/", ".")
SpannerMetricsTracerFactory.current_metrics_tracer.set_method(method_name)

SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start()
response = invoked_method(request_or_iterator, call_details)
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion()

return response
Loading