Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ message RedisProxy {
"envoy.config.filter.network.redis_proxy.v2.RedisProxy";

// Redis connection pool settings.
// [#next-free-field: 11]
// [#next-free-field: 13]
message ConnPoolSettings {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings";
Expand Down Expand Up @@ -134,6 +134,37 @@ message RedisProxy {
// storm to busy redis server. This config is a protection to rate limit reconnection rate.
// If not set, there will be no rate limiting on the reconnection.
ConnectionRateLimit connection_rate_limit = 10;

// Enable per-shard statistics for tracking hot shard usage. When enabled, the following
// statistics will be emitted per upstream host (shard):
//
// * ``upstream_rq_total``: Total requests to this shard
// * ``upstream_rq_success``: Successful requests to this shard
// * ``upstream_rq_failure``: Failed requests to this shard
// * ``upstream_rq_active``: Active requests to this shard (gauge)
//
// The statistics will be emitted under the scope:
// ``cluster.<cluster_name>.shard.<host_address>.*``
//
// .. note::
// Enabling this option may significantly increase metric cardinality in large clusters
// with many shards. Use with caution in production environments.
bool enable_per_shard_stats = 11;

// Enable per-shard latency histogram for tracking request latency per upstream host (shard).
// When enabled, the following histogram will be emitted per shard:
//
// * ``upstream_rq_time``: Request latency histogram in microseconds
//
// The histogram will be emitted under the scope:
// ``cluster.<cluster_name>.shard.<host_address>.upstream_rq_time``
//
// This option requires ``enable_per_shard_stats`` to be enabled.
//
// .. note::
// Enabling this option may significantly increase metric cardinality in large clusters
// with many shards. Use with caution in production environments.
bool enable_per_shard_latency_stats = 12;
}

message PrefixRoutes {
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/clusters/redis/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl {
std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; }
uint32_t maxUpstreamUnknownConnections() const override { return 0; }
bool enableCommandStats() const override { return true; }
bool enablePerShardStats() const override { return false; } // Not needed for discovery
bool enablePerShardLatencyStats() const override { return false; } // Not needed for discovery
bool connectionRateLimitEnabled() const override { return false; }
uint32_t connectionRateLimitPerSec() const override { return 0; }
// For any readPolicy other than Primary, the RedisClientFactory will send a READONLY command
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ class Config {
*/
virtual bool enableCommandStats() const PURE;

/**
* @return when enabled, per-shard statistics will be recorded for tracking hot shard usage.
*/
virtual bool enablePerShardStats() const PURE;

/**
* @return when enabled, per-shard latency histograms will be recorded.
*/
virtual bool enablePerShardLatencyStats() const PURE;

/**
* @return the read policy the proxy should use.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ ConfigImpl::ConfigImpl(
// as the buffer is flushed on each request immediately.
max_upstream_unknown_connections_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)),
enable_command_stats_(config.enable_command_stats()) {
enable_command_stats_(config.enable_command_stats()),
enable_per_shard_stats_(config.enable_per_shard_stats()),
enable_per_shard_latency_stats_(config.enable_per_shard_latency_stats()) {
switch (config.read_policy()) {
PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
case envoy::extensions::filters::network::redis_proxy::v3::RedisProxy::ConnPoolSettings::MASTER:
Expand Down
4 changes: 4 additions & 0 deletions source/extensions/filters/network/common/redis/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class ConfigImpl : public Config {
return max_upstream_unknown_connections_;
}
bool enableCommandStats() const override { return enable_command_stats_; }
bool enablePerShardStats() const override { return enable_per_shard_stats_; }
bool enablePerShardLatencyStats() const override { return enable_per_shard_latency_stats_; }
ReadPolicy readPolicy() const override { return read_policy_; }
bool connectionRateLimitEnabled() const override { return connection_rate_limit_enabled_; }
uint32_t connectionRateLimitPerSec() const override { return connection_rate_limit_per_sec_; }
Expand All @@ -66,6 +68,8 @@ class ConfigImpl : public Config {
const std::chrono::milliseconds buffer_flush_timeout_;
const uint32_t max_upstream_unknown_connections_;
const bool enable_command_stats_;
const bool enable_per_shard_stats_;
const bool enable_per_shard_latency_stats_;
ReadPolicy read_policy_;
bool connection_rate_limit_enabled_;
uint32_t connection_rate_limit_per_sec_;
Expand Down
157 changes: 154 additions & 3 deletions source/extensions/filters/network/redis_proxy/conn_pool_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,52 @@ void InstanceImpl::ThreadLocalPool::drainClients() {
}
}

RedisShardStatsSharedPtr
InstanceImpl::ThreadLocalPool::getOrCreateShardStats(const std::string& host_address) {
auto it = shard_stats_map_.find(host_address);
if (it != shard_stats_map_.end()) {
return it->second.stats_;
}

// Create a sanitized stat name from the host address (replace ':' and '.' with '_')
std::string stat_name = host_address;
std::replace(stat_name.begin(), stat_name.end(), ':', '_');
std::replace(stat_name.begin(), stat_name.end(), '.', '_');

Stats::ScopeSharedPtr shard_scope = stats_scope_->createScope(fmt::format("shard.{}.", stat_name));
auto shard_stats = std::make_shared<RedisShardStats>(RedisShardStats{
REDIS_SHARD_STATS(POOL_COUNTER(*shard_scope), POOL_GAUGE(*shard_scope))});
// Store both scope and stats to keep the scope alive
shard_stats_map_[host_address] = ShardStatsEntry{shard_scope, shard_stats};
return shard_stats;
}

Stats::ScopeSharedPtr
InstanceImpl::ThreadLocalPool::getShardScope(const std::string& host_address) {
auto it = shard_stats_map_.find(host_address);
if (it != shard_stats_map_.end()) {
return it->second.scope_;
}
return nullptr;
}

Stats::Histogram*
InstanceImpl::ThreadLocalPool::getOrCreateShardLatencyHistogram(const std::string& host_address) {
auto it = shard_stats_map_.find(host_address);
if (it == shard_stats_map_.end()) {
// Create shard stats entry first if it doesn't exist
getOrCreateShardStats(host_address);
it = shard_stats_map_.find(host_address);
}

if (it->second.latency_histogram_ == nullptr) {
// Create the histogram if it doesn't exist
it->second.latency_histogram_ = &it->second.scope_->histogramFromString(
"upstream_rq_latency", Stats::Histogram::Unit::Microseconds);
}
return it->second.latency_histogram_;
}

InstanceImpl::ThreadLocalActiveClientPtr&
InstanceImpl::ThreadLocalPool::threadLocalActiveClient(Upstream::HostConstSharedPtr host) {
TokenBucketPtr& rate_limiter = cx_rate_limiter_map_[host];
Expand Down Expand Up @@ -456,7 +502,22 @@ InstanceImpl::ThreadLocalPool::makeRequestToHost(Upstream::HostConstSharedPtr& h
}
}

pending_requests_.emplace_back(*this, std::move(request), callbacks, host);
// Get or create per-shard stats for tracking hot shard usage (if enabled)
RedisShardStatsSharedPtr shard_stats = nullptr;
Stats::ScopeSharedPtr shard_scope = nullptr;
Stats::Histogram* latency_histogram = nullptr;
if (config_->enablePerShardStats()) {
const std::string host_address = host->address()->asString();
shard_stats = getOrCreateShardStats(host_address);
shard_scope = getShardScope(host_address);
}
if (config_->enablePerShardLatencyStats()) {
const std::string host_address = host->address()->asString();
latency_histogram = getOrCreateShardLatencyHistogram(host_address);
}

pending_requests_.emplace_back(*this, std::move(request), callbacks, host, shard_stats,
shard_scope, latency_histogram);
PendingRequest& pending_request = pending_requests_.back();

if (!transaction.active_) {
Expand Down Expand Up @@ -519,16 +580,46 @@ void InstanceImpl::ThreadLocalActiveClient::onEvent(Network::ConnectionEvent eve
InstanceImpl::PendingRequest::PendingRequest(InstanceImpl::ThreadLocalPool& parent,
RespVariant&& incoming_request,
PoolCallbacks& pool_callbacks,
Upstream::HostConstSharedPtr& host)
Upstream::HostConstSharedPtr& host,
RedisShardStatsSharedPtr shard_stats,
Stats::ScopeSharedPtr shard_scope,
Stats::Histogram* latency_histogram)
: parent_(parent), incoming_request_(std::move(incoming_request)),
pool_callbacks_(pool_callbacks), host_(host) {}
pool_callbacks_(pool_callbacks), host_(host), shard_stats_(std::move(shard_stats)),
shard_scope_(std::move(shard_scope)), latency_histogram_(latency_histogram),
start_time_(parent.dispatcher_.timeSource().monotonicTime()) {
// Track per-shard request metrics and command stats
if (shard_stats_) {
shard_stats_->upstream_rq_total_.inc();
shard_stats_->upstream_rq_active_.inc();
}
// Extract and track command name for per-shard command stats
if (shard_scope_ && parent_.config_->enableCommandStats()) {
command_ = parent_.redis_command_stats_->getCommandFromRequest(getRequest(incoming_request_));
parent_.redis_command_stats_->updateStatsTotal(*shard_scope_, command_);
// Create per-shard per-command latency timer when both command stats and per-shard latency are enabled
if (parent_.config_->enablePerShardLatencyStats()) {
command_latency_timer_ = parent_.redis_command_stats_->createCommandTimer(
*shard_scope_, command_, parent_.dispatcher_.timeSource());
}
}
}

InstanceImpl::PendingRequest::~PendingRequest() {
cache_load_handle_.reset();

if (request_handler_) {
request_handler_->cancel();
request_handler_ = nullptr;
// Update per-shard stats - treat cancellation as failure
if (shard_stats_) {
shard_stats_->upstream_rq_active_.dec();
shard_stats_->upstream_rq_failure_.inc();
}
// Update per-shard command stats (failure due to cancellation)
if (shard_scope_ && parent_.config_->enableCommandStats()) {
parent_.redis_command_stats_->updateStats(*shard_scope_, command_, false);
}
// If we have to cancel the request on the client, then we'll treat this as failure for pool
// callback
pool_callbacks_.onFailure();
Expand All @@ -537,12 +628,52 @@ InstanceImpl::PendingRequest::~PendingRequest() {

void InstanceImpl::PendingRequest::onResponse(Common::Redis::RespValuePtr&& response) {
request_handler_ = nullptr;
// Update per-shard stats
if (shard_stats_) {
shard_stats_->upstream_rq_active_.dec();
shard_stats_->upstream_rq_success_.inc();
}
// Update per-shard command stats (success)
if (shard_scope_ && parent_.config_->enableCommandStats()) {
parent_.redis_command_stats_->updateStats(*shard_scope_, command_, true);
}
// Record per-shard latency histogram
if (latency_histogram_ != nullptr) {
const auto end_time = parent_.dispatcher_.timeSource().monotonicTime();
const auto latency_us = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time_).count();
latency_histogram_->recordValue(latency_us);
}
// Complete per-shard per-command latency timer
if (command_latency_timer_) {
command_latency_timer_->complete();
}
pool_callbacks_.onResponse(std::move(response));
parent_.onRequestCompleted();
}

void InstanceImpl::PendingRequest::onFailure() {
request_handler_ = nullptr;
// Update per-shard stats
if (shard_stats_) {
shard_stats_->upstream_rq_active_.dec();
shard_stats_->upstream_rq_failure_.inc();
}
// Update per-shard command stats (failure)
if (shard_scope_ && parent_.config_->enableCommandStats()) {
parent_.redis_command_stats_->updateStats(*shard_scope_, command_, false);
}
// Record per-shard latency histogram (even for failures)
if (latency_histogram_ != nullptr) {
const auto end_time = parent_.dispatcher_.timeSource().monotonicTime();
const auto latency_us = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time_).count();
latency_histogram_->recordValue(latency_us);
}
// Complete per-shard per-command latency timer
if (command_latency_timer_) {
command_latency_timer_->complete();
}
pool_callbacks_.onFailure();
parent_.refresh_manager_->onFailure(parent_.cluster_name_);
parent_.onRequestCompleted();
Expand Down Expand Up @@ -641,6 +772,26 @@ void InstanceImpl::PendingRequest::doRedirection(Common::Redis::RespValuePtr&& v
void InstanceImpl::PendingRequest::cancel() {
request_handler_->cancel();
request_handler_ = nullptr;
// Update per-shard stats - treat cancellation as failure
if (shard_stats_) {
shard_stats_->upstream_rq_active_.dec();
shard_stats_->upstream_rq_failure_.inc();
}
// Update per-shard command stats (failure due to cancellation)
if (shard_scope_ && parent_.config_->enableCommandStats()) {
parent_.redis_command_stats_->updateStats(*shard_scope_, command_, false);
}
// Record per-shard latency histogram (even for cancellations)
if (latency_histogram_ != nullptr) {
const auto end_time = parent_.dispatcher_.timeSource().monotonicTime();
const auto latency_us = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time_).count();
latency_histogram_->recordValue(latency_us);
}
// Complete per-shard per-command latency timer
if (command_latency_timer_) {
command_latency_timer_->complete();
}
parent_.onRequestCompleted();
}

Expand Down
43 changes: 42 additions & 1 deletion source/extensions/filters/network/redis_proxy/conn_pool_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
#include <string>
#include <vector>

#include "envoy/common/time.h"
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h"
#include "envoy/stats/stats_macros.h"
#include "envoy/stats/timespan.h"
#include "envoy/thread_local/thread_local.h"
#include "envoy/upstream/cluster_manager.h"

Expand Down Expand Up @@ -49,6 +51,32 @@ struct RedisClusterStats {
REDIS_CLUSTER_STATS(GENERATE_COUNTER_STRUCT)
};

/**
* Per-shard statistics for tracking hot shard usage.
* These metrics help identify which shards are receiving more traffic.
*/
#define REDIS_SHARD_STATS(COUNTER, GAUGE) \
COUNTER(upstream_rq_total) \
COUNTER(upstream_rq_success) \
COUNTER(upstream_rq_failure) \
GAUGE(upstream_rq_active, Accumulate)

struct RedisShardStats {
REDIS_SHARD_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
};

using RedisShardStatsSharedPtr = std::shared_ptr<RedisShardStats>;

/**
* Struct to hold per-shard stats and the scope they belong to.
* The scope must be kept alive for the stats to remain valid.
*/
struct ShardStatsEntry {
Stats::ScopeSharedPtr scope_;
RedisShardStatsSharedPtr stats_;
Stats::Histogram* latency_histogram_{nullptr}; // Per-shard latency histogram (optional)
};

class DoNothingPoolCallbacks : public PoolCallbacks {
public:
void onResponse(Common::Redis::RespValuePtr&&) override {};
Expand Down Expand Up @@ -119,7 +147,9 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this<Instan
public Extensions::Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryCallbacks,
public Logger::Loggable<Logger::Id::redis> {
PendingRequest(ThreadLocalPool& parent, RespVariant&& incoming_request,
PoolCallbacks& pool_callbacks, Upstream::HostConstSharedPtr& host);
PoolCallbacks& pool_callbacks, Upstream::HostConstSharedPtr& host,
RedisShardStatsSharedPtr shard_stats, Stats::ScopeSharedPtr shard_scope,
Stats::Histogram* latency_histogram);
~PendingRequest() override;

// Common::Redis::Client::ClientCallbacks
Expand Down Expand Up @@ -148,6 +178,12 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this<Instan
bool ask_redirection_;
Extensions::Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryHandlePtr
cache_load_handle_;
RedisShardStatsSharedPtr shard_stats_;
Stats::ScopeSharedPtr shard_scope_; // Scope for per-shard command stats
Stats::StatName command_; // Command name for per-shard command stats
Stats::Histogram* latency_histogram_{nullptr}; // Per-shard latency histogram
MonotonicTime start_time_; // Request start time for latency tracking
Stats::TimespanPtr command_latency_timer_; // Per-shard per-command latency timer
};

struct ThreadLocalPool : public ThreadLocal::ThreadLocalObject,
Expand Down Expand Up @@ -180,6 +216,9 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this<Instan
void onHostsAdded(const std::vector<Upstream::HostSharedPtr>& hosts_added);
void onHostsRemoved(const std::vector<Upstream::HostSharedPtr>& hosts_removed);
void drainClients();
RedisShardStatsSharedPtr getOrCreateShardStats(const std::string& host_address);
Stats::ScopeSharedPtr getShardScope(const std::string& host_address);
Stats::Histogram* getOrCreateShardLatencyHistogram(const std::string& host_address);

// Upstream::ClusterUpdateCallbacks
void onClusterAddOrUpdate(absl::string_view cluster_name,
Expand Down Expand Up @@ -222,6 +261,8 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this<Instan
absl::optional<Common::Redis::AwsIamAuthenticator::AwsIamAuthenticatorSharedPtr>
aws_iam_authenticator_;
absl::optional<envoy::extensions::filters::network::redis_proxy::v3::AwsIam> aws_iam_config_;
// Per-shard stats map keyed by host address (e.g., "10.0.0.1:6379")
absl::node_hash_map<std::string, ShardStatsEntry> shard_stats_map_;
};

const std::string cluster_name_;
Expand Down
Loading