From 4bf781c26dcff2138596c9dccd2ce897f698750f Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:11:47 +1100 Subject: [PATCH 01/31] feat(context): add PhaseTiming struct for performance breakdown Tracks individual phase durations for statement processing: - parse, encrypt, server_write, server_wait, server_response, client_write, decrypt Enables detailed performance breakdown in diagnostic logs. --- .../src/postgresql/context/mod.rs | 3 +- .../src/postgresql/context/phase_timing.rs | 185 ++++++++++++++++++ 2 files changed, 187 insertions(+), 1 deletion(-) create mode 100644 packages/cipherstash-proxy/src/postgresql/context/phase_timing.rs diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index fa2365f4..fd2ba8f2 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -1,8 +1,9 @@ pub mod column; +pub mod phase_timing; pub mod portal; pub mod statement; -pub use self::{portal::Portal, statement::Statement}; +pub use self::{phase_timing::{PhaseTiming, PhaseTimer}, portal::Portal, statement::Statement}; use super::{ column_mapper::ColumnMapper, messages::{describe::Describe, Name, Target}, diff --git a/packages/cipherstash-proxy/src/postgresql/context/phase_timing.rs b/packages/cipherstash-proxy/src/postgresql/context/phase_timing.rs new file mode 100644 index 00000000..3c0b6a89 --- /dev/null +++ b/packages/cipherstash-proxy/src/postgresql/context/phase_timing.rs @@ -0,0 +1,185 @@ +use std::time::{Duration, Instant}; + +/// Tracks timing for individual phases of statement processing +#[derive(Clone, Debug, Default)] +pub struct PhaseTiming { + /// SQL parsing and type-checking time + pub parse_duration: Option, + /// Encryption operation time (includes ZeroKMS network) + pub encrypt_duration: Option, + /// Time to write to PostgreSQL server + pub server_write_duration: Option, + /// Time from server write to first response byte + pub server_wait_duration: Option, + /// Time to receive complete server response + pub server_response_duration: Option, + /// Time to write response to client + pub client_write_duration: Option, + /// Decryption operation time + pub decrypt_duration: Option, +} + +impl PhaseTiming { + pub fn new() -> Self { + Self::default() + } + + /// Record parse phase duration (first write wins) + pub fn record_parse(&mut self, duration: Duration) { + self.parse_duration.get_or_insert(duration); + } + + /// Add parse duration (accumulate) + pub fn add_parse(&mut self, duration: Duration) { + self.parse_duration = Some(self.parse_duration.unwrap_or_default() + duration); + } + + /// Record encrypt phase duration (first write wins) + pub fn record_encrypt(&mut self, duration: Duration) { + self.encrypt_duration.get_or_insert(duration); + } + + /// Add encrypt duration (accumulate) + pub fn add_encrypt(&mut self, duration: Duration) { + self.encrypt_duration = Some(self.encrypt_duration.unwrap_or_default() + duration); + } + + /// Record server write phase duration (first write wins) + pub fn record_server_write(&mut self, duration: Duration) { + self.server_write_duration.get_or_insert(duration); + } + + /// Add server write duration (accumulate) + pub fn add_server_write(&mut self, duration: Duration) { + self.server_write_duration = + Some(self.server_write_duration.unwrap_or_default() + duration); + } + + /// Record server wait phase duration (first byte latency, first write wins) + pub fn record_server_wait(&mut self, duration: Duration) { + self.server_wait_duration.get_or_insert(duration); + } + + /// Record server response phase duration (first write wins) + pub fn record_server_response(&mut self, duration: Duration) { + self.server_response_duration.get_or_insert(duration); + } + + /// Add server response duration (accumulate) + pub fn add_server_response(&mut self, duration: Duration) { + self.server_response_duration = + Some(self.server_response_duration.unwrap_or_default() + duration); + } + + /// Record client write phase duration (first write wins) + pub fn record_client_write(&mut self, duration: Duration) { + self.client_write_duration.get_or_insert(duration); + } + + /// Add client write duration (accumulate) + pub fn add_client_write(&mut self, duration: Duration) { + self.client_write_duration = + Some(self.client_write_duration.unwrap_or_default() + duration); + } + + /// Record decrypt phase duration (first write wins) + pub fn record_decrypt(&mut self, duration: Duration) { + self.decrypt_duration.get_or_insert(duration); + } + + /// Add decrypt duration (accumulate) + pub fn add_decrypt(&mut self, duration: Duration) { + self.decrypt_duration = Some(self.decrypt_duration.unwrap_or_default() + duration); + } + + /// Calculate total tracked duration + pub fn total_tracked(&self) -> Duration { + [ + self.parse_duration, + self.encrypt_duration, + self.server_write_duration, + self.server_wait_duration, + self.server_response_duration, + self.client_write_duration, + self.decrypt_duration, + ] + .iter() + .filter_map(|d| *d) + .sum() + } +} + +/// Helper to time a phase +pub struct PhaseTimer { + start: Instant, +} + +impl PhaseTimer { + pub fn start() -> Self { + Self { + start: Instant::now(), + } + } + + pub fn elapsed(&self) -> Duration { + self.start.elapsed() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn phase_timing_records_durations() { + let mut timing = PhaseTiming::new(); + + timing.record_parse(Duration::from_millis(5)); + timing.record_encrypt(Duration::from_millis(100)); + timing.record_server_wait(Duration::from_millis(50)); + + assert_eq!(timing.parse_duration, Some(Duration::from_millis(5))); + assert_eq!(timing.encrypt_duration, Some(Duration::from_millis(100))); + assert_eq!(timing.server_wait_duration, Some(Duration::from_millis(50))); + } + + #[test] + fn total_tracked_sums_durations() { + let mut timing = PhaseTiming::new(); + + timing.record_parse(Duration::from_millis(5)); + timing.record_encrypt(Duration::from_millis(100)); + timing.record_server_wait(Duration::from_millis(50)); + + assert_eq!(timing.total_tracked(), Duration::from_millis(155)); + } + + #[test] + fn add_encrypt_accumulates() { + let mut timing = PhaseTiming::new(); + + timing.add_encrypt(Duration::from_millis(10)); + timing.add_encrypt(Duration::from_millis(15)); + + assert_eq!(timing.encrypt_duration, Some(Duration::from_millis(25))); + } + + #[test] + fn add_server_write_accumulates() { + let mut timing = PhaseTiming::new(); + + timing.add_server_write(Duration::from_millis(3)); + timing.add_server_write(Duration::from_millis(7)); + + assert_eq!(timing.server_write_duration, Some(Duration::from_millis(10))); + } + + #[test] + fn phase_timer_measures_elapsed() { + let timer = PhaseTimer::start(); + std::thread::sleep(Duration::from_millis(10)); + let elapsed = timer.elapsed(); + + assert!(elapsed >= Duration::from_millis(10)); + } +} From 309528c4a9deaf70e7c84d040bc809496a4961b0 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:12:50 +1100 Subject: [PATCH 02/31] feat(context): add StatementMetadata for diagnostic tracking Captures statement-level metadata: - statement_type (INSERT/UPDATE/DELETE/SELECT/OTHER) - protocol (simple/extended) - encrypted flag and value count - param_bytes and query_fingerprint Enables labeled metrics and detailed diagnostic logs. --- .../postgresql/context/statement_metadata.rs | 173 ++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs diff --git a/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs b/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs new file mode 100644 index 00000000..40084a84 --- /dev/null +++ b/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs @@ -0,0 +1,173 @@ +use serde::Serialize; + +/// Statement type classification for metrics labels +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] +#[serde(rename_all = "UPPERCASE")] +pub enum StatementType { + Insert, + Update, + Delete, + Select, + Other, +} + +impl StatementType { + /// Create from SQL statement string + pub fn from_sql(sql: &str) -> Self { + let trimmed = sql.trim_start().to_uppercase(); + if trimmed.starts_with("INSERT") { + StatementType::Insert + } else if trimmed.starts_with("UPDATE") { + StatementType::Update + } else if trimmed.starts_with("DELETE") { + StatementType::Delete + } else if trimmed.starts_with("SELECT") { + StatementType::Select + } else { + StatementType::Other + } + } + + /// Return lowercase label for metrics + pub fn as_label(&self) -> &'static str { + match self { + StatementType::Insert => "insert", + StatementType::Update => "update", + StatementType::Delete => "delete", + StatementType::Select => "select", + StatementType::Other => "other", + } + } +} + +/// Protocol type for metrics labels +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum ProtocolType { + Simple, + Extended, +} + +impl ProtocolType { + pub fn as_label(&self) -> &'static str { + match self { + ProtocolType::Simple => "simple", + ProtocolType::Extended => "extended", + } + } +} + +/// Metadata collected during statement processing for diagnostics +#[derive(Clone, Debug, Default)] +pub struct StatementMetadata { + /// Type of SQL statement + pub statement_type: Option, + /// Protocol used (simple or extended) + pub protocol: Option, + /// Whether encryption/decryption was performed + pub encrypted: bool, + /// Number of encrypted values in the statement + pub encrypted_values_count: usize, + /// Approximate size of parameters in bytes + pub param_bytes: usize, + /// Query fingerprint (first 8 chars of normalized query hash) + pub query_fingerprint: Option, + /// Whether the simple query contained multiple statements + pub multi_statement: bool, +} + +impl StatementMetadata { + pub fn new() -> Self { + Self::default() + } + + pub fn with_statement_type(mut self, stmt_type: StatementType) -> Self { + self.statement_type = Some(stmt_type); + self + } + + pub fn with_protocol(mut self, protocol: ProtocolType) -> Self { + self.protocol = Some(protocol); + self + } + + pub fn with_encrypted(mut self, encrypted: bool) -> Self { + self.encrypted = encrypted; + self + } + + pub fn set_encrypted_values_count(&mut self, count: usize) { + self.encrypted_values_count = count; + } + + pub fn set_param_bytes(&mut self, bytes: usize) { + self.param_bytes = bytes; + } + + pub fn set_query_fingerprint(&mut self, sql: &str) { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + sql.hash(&mut hasher); + let hash = hasher.finish(); + self.query_fingerprint = Some(format!("{:08x}", hash & 0xFFFFFFFF)); + } + + pub fn set_multi_statement(&mut self, value: bool) { + self.multi_statement = value; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn statement_type_from_sql() { + assert_eq!(StatementType::from_sql("INSERT INTO foo VALUES (1)"), StatementType::Insert); + assert_eq!(StatementType::from_sql(" insert into foo"), StatementType::Insert); + assert_eq!(StatementType::from_sql("UPDATE foo SET bar = 1"), StatementType::Update); + assert_eq!(StatementType::from_sql("DELETE FROM foo"), StatementType::Delete); + assert_eq!(StatementType::from_sql("SELECT * FROM foo"), StatementType::Select); + assert_eq!(StatementType::from_sql("CREATE TABLE foo"), StatementType::Other); + } + + #[test] + fn statement_type_labels() { + assert_eq!(StatementType::Insert.as_label(), "insert"); + assert_eq!(StatementType::Update.as_label(), "update"); + assert_eq!(StatementType::Delete.as_label(), "delete"); + assert_eq!(StatementType::Select.as_label(), "select"); + assert_eq!(StatementType::Other.as_label(), "other"); + } + + #[test] + fn metadata_builder_pattern() { + let metadata = StatementMetadata::new() + .with_statement_type(StatementType::Insert) + .with_protocol(ProtocolType::Extended) + .with_encrypted(true); + + assert_eq!(metadata.statement_type, Some(StatementType::Insert)); + assert_eq!(metadata.protocol, Some(ProtocolType::Extended)); + assert!(metadata.encrypted); + } + + #[test] + fn query_fingerprint_is_deterministic() { + let mut m1 = StatementMetadata::new(); + let mut m2 = StatementMetadata::new(); + + m1.set_query_fingerprint("SELECT * FROM users WHERE id = $1"); + m2.set_query_fingerprint("SELECT * FROM users WHERE id = $1"); + + assert_eq!(m1.query_fingerprint, m2.query_fingerprint); + } + + #[test] + fn multi_statement_flag_defaults_false() { + let metadata = StatementMetadata::new(); + assert!(!metadata.multi_statement); + } +} From 91f386cc3f86c98a5c2f77555bc1089a7a9b73a1 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:13:27 +1100 Subject: [PATCH 03/31] feat(log): add SLOW_STATEMENTS log target New log target for slow statement logging, configurable via CS_LOG__SLOW_STATEMENTS_LEVEL environment variable (default: warn). --- packages/cipherstash-proxy/src/log/mod.rs | 1 + packages/cipherstash-proxy/src/log/targets.rs | 1 + packages/cipherstash-proxy/src/postgresql/context/mod.rs | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/cipherstash-proxy/src/log/mod.rs b/packages/cipherstash-proxy/src/log/mod.rs index 725a0384..0f78c65f 100644 --- a/packages/cipherstash-proxy/src/log/mod.rs +++ b/packages/cipherstash-proxy/src/log/mod.rs @@ -127,6 +127,7 @@ mod tests { mapper_level: LogLevel::Info, schema_level: LogLevel::Info, config_level: LogLevel::Info, + slow_statements_level: LogLevel::Info, }, }; diff --git a/packages/cipherstash-proxy/src/log/targets.rs b/packages/cipherstash-proxy/src/log/targets.rs index e45c0988..f959c56b 100644 --- a/packages/cipherstash-proxy/src/log/targets.rs +++ b/packages/cipherstash-proxy/src/log/targets.rs @@ -78,4 +78,5 @@ define_log_targets!( (PROXY, proxy_level), (MAPPER, mapper_level), (SCHEMA, schema_level), + (SLOW_STATEMENTS, slow_statements_level), ); diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index fd2ba8f2..e03050e7 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -2,8 +2,9 @@ pub mod column; pub mod phase_timing; pub mod portal; pub mod statement; - +pub mod statement_metadata; pub use self::{phase_timing::{PhaseTiming, PhaseTimer}, portal::Portal, statement::Statement}; +pub use statement_metadata::{StatementMetadata, StatementType, ProtocolType}; use super::{ column_mapper::ColumnMapper, messages::{describe::Describe, Name, Target}, From 64ac236747b29f8696cbeafb9838125109dff536 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:17:48 +1100 Subject: [PATCH 04/31] feat(config): add slow statement logging configuration Adds configuration options for slow statement logging: - CS_LOG__SLOW_STATEMENTS: enable slow statement logging - CS_LOG__SLOW_STATEMENT_MIN_DURATION_MS: threshold (default 2000ms) - CS_LOG__SLOW_STATEMENTS_LEVEL: log level (default warn when enabled) Simple config: just set CS_LOG__SLOW_STATEMENTS=true --- packages/cipherstash-proxy/src/config/log.rs | 14 ++++++++++++++ .../cipherstash-proxy/src/config/tandem.rs | 18 ++++++++++++++++++ packages/cipherstash-proxy/src/log/mod.rs | 2 ++ 3 files changed, 34 insertions(+) diff --git a/packages/cipherstash-proxy/src/config/log.rs b/packages/cipherstash-proxy/src/config/log.rs index b5f61011..ce1e2fb2 100644 --- a/packages/cipherstash-proxy/src/config/log.rs +++ b/packages/cipherstash-proxy/src/config/log.rs @@ -20,6 +20,14 @@ pub struct LogConfig { #[serde(default = "LogConfig::default_log_level")] pub level: LogLevel, + /// Enable slow statement logging + #[serde(default)] + pub slow_statements: bool, + + /// Threshold in milliseconds for slow statement logging (default: 2000ms) + #[serde(default = "LogConfig::default_slow_statement_min_duration_ms")] + pub slow_statement_min_duration_ms: u64, + // All log target levels - automatically generated and flattened from LogTargetLevels // To add a new target: just add it to the define_log_targets! macro in log/targets.rs #[serde(flatten)] @@ -90,6 +98,8 @@ impl LogConfig { output: LogConfig::default_log_output(), ansi_enabled: LogConfig::default_ansi_enabled(), level, + slow_statements: false, + slow_statement_min_duration_ms: Self::default_slow_statement_min_duration_ms(), // All target levels automatically set using generated LogTargetLevels targets: LogTargetLevels::with_level(level), } @@ -114,6 +124,10 @@ impl LogConfig { pub const fn default_log_level() -> LogLevel { LogLevel::Info } + + pub const fn default_slow_statement_min_duration_ms() -> u64 { + 2000 // 2 seconds + } } #[cfg(test)] diff --git a/packages/cipherstash-proxy/src/config/tandem.rs b/packages/cipherstash-proxy/src/config/tandem.rs index ef4ac427..53ce45aa 100644 --- a/packages/cipherstash-proxy/src/config/tandem.rs +++ b/packages/cipherstash-proxy/src/config/tandem.rs @@ -855,4 +855,22 @@ mod tests { }) }) } + + #[test] + fn slow_statements_config() { + with_no_cs_vars(|| { + temp_env::with_vars( + [ + ("CS_LOG__SLOW_STATEMENTS", Some("true")), + ("CS_LOG__SLOW_STATEMENT_MIN_DURATION_MS", Some("500")), + ], + || { + let config = TandemConfig::build_path("tests/config/cipherstash-proxy-test.toml").unwrap(); + + assert!(config.log.slow_statements); + assert_eq!(config.log.slow_statement_min_duration_ms, 500); + }, + ); + }); + } } diff --git a/packages/cipherstash-proxy/src/log/mod.rs b/packages/cipherstash-proxy/src/log/mod.rs index 0f78c65f..eb999e6b 100644 --- a/packages/cipherstash-proxy/src/log/mod.rs +++ b/packages/cipherstash-proxy/src/log/mod.rs @@ -112,6 +112,8 @@ mod tests { output: LogConfig::default_log_output(), ansi_enabled: LogConfig::default_ansi_enabled(), level: LogLevel::Info, + slow_statements: false, + slow_statement_min_duration_ms: LogConfig::default_slow_statement_min_duration_ms(), targets: LogTargetLevels { development_level: LogLevel::Info, authentication_level: LogLevel::Debug, From e4fd48f28687472385bd5ae2131802cda5de0c08 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:18:51 +1100 Subject: [PATCH 05/31] fix(log): export SLOW_STATEMENTS from log module --- packages/cipherstash-proxy/src/log/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cipherstash-proxy/src/log/mod.rs b/packages/cipherstash-proxy/src/log/mod.rs index eb999e6b..e57390f7 100644 --- a/packages/cipherstash-proxy/src/log/mod.rs +++ b/packages/cipherstash-proxy/src/log/mod.rs @@ -16,7 +16,7 @@ use tracing_subscriber::{ // All targets are now defined in the targets module using the define_log_targets! macro. pub use targets::{ AUTHENTICATION, CONFIG, CONTEXT, DECRYPT, DEVELOPMENT, ENCODING, ENCRYPT, ENCRYPT_CONFIG, - KEYSET, MAPPER, MIGRATE, PROTOCOL, PROXY, SCHEMA, + KEYSET, MAPPER, MIGRATE, PROTOCOL, PROXY, SCHEMA, SLOW_STATEMENTS, }; static INIT: Once = Once::new(); From f9bca3828159bf3863e906a4d8dd0f688c00e0a2 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:23:29 +1100 Subject: [PATCH 06/31] feat(context): integrate PhaseTiming and StatementMetadata into session SessionMetricsContext now tracks phase timing breakdown and statement metadata for diagnostic logging. --- packages/cipherstash-proxy/src/postgresql/context/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index e03050e7..5c23a487 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -88,12 +88,16 @@ impl ExecuteContext { #[derive(Clone, Debug)] pub struct SessionMetricsContext { start: Instant, + pub phase_timing: PhaseTiming, + pub metadata: StatementMetadata, } impl SessionMetricsContext { fn new() -> SessionMetricsContext { SessionMetricsContext { start: Instant::now(), + phase_timing: PhaseTiming::new(), + metadata: StatementMetadata::new(), } } From ed54387a916b5d119c14b4a7591c1d81f02f0280 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:24:46 +1100 Subject: [PATCH 07/31] feat(metrics): add keyset_cipher_init_duration_seconds histogram Measures time for ZeroKMS cipher initialization, which includes the network call to ZeroKMS. This helps isolate ZeroKMS latency as a bottleneck separate from local crypto operations. --- packages/cipherstash-proxy/src/prometheus.rs | 6 +++++ .../src/proxy/zerokms/zerokms.rs | 27 +++++++++++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/packages/cipherstash-proxy/src/prometheus.rs b/packages/cipherstash-proxy/src/prometheus.rs index afaeef8d..24b56b3a 100644 --- a/packages/cipherstash-proxy/src/prometheus.rs +++ b/packages/cipherstash-proxy/src/prometheus.rs @@ -39,6 +39,7 @@ pub const SERVER_BYTES_RECEIVED_TOTAL: &str = "cipherstash_proxy_server_bytes_re pub const KEYSET_CIPHER_INIT_TOTAL: &str = "cipherstash_proxy_keyset_cipher_init_total"; pub const KEYSET_CIPHER_CACHE_HITS_TOTAL: &str = "cipherstash_proxy_keyset_cipher_cache_hits_total"; +pub const KEYSET_CIPHER_INIT_DURATION_SECONDS: &str = "cipherstash_proxy_keyset_cipher_init_duration_seconds"; pub fn start(host: String, port: u16) -> Result<(), Error> { let address = format!("{host}:{port}"); @@ -156,6 +157,11 @@ pub fn start(host: String, port: u16) -> Result<(), Error> { KEYSET_CIPHER_CACHE_HITS_TOTAL, "Number of times a keyset-scoped cipher was found in the cache" ); + describe_histogram!( + KEYSET_CIPHER_INIT_DURATION_SECONDS, + Unit::Seconds, + "Duration of keyset-scoped cipher initialization (includes ZeroKMS network call)" + ); // Prometheus endpoint is empty on startup and looks like an error // Explicitly set count to zero diff --git a/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs b/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs index 6961fa94..70152799 100644 --- a/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs +++ b/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs @@ -3,16 +3,16 @@ use crate::{ error::{EncryptError, Error, ZeroKMSError}, log::{ENCRYPT, PROXY}, postgresql::{Column, KeysetIdentifier}, - prometheus::{KEYSET_CIPHER_CACHE_HITS_TOTAL, KEYSET_CIPHER_INIT_TOTAL}, + prometheus::{KEYSET_CIPHER_CACHE_HITS_TOTAL, KEYSET_CIPHER_INIT_DURATION_SECONDS, KEYSET_CIPHER_INIT_TOTAL}, proxy::EncryptionService, }; use cipherstash_client::{ encryption::{Plaintext, ReferencedPendingPipeline}, eql::{self, decrypt_eql, encrypt_eql, EqlEncryptionSpec}, }; -use metrics::counter; +use metrics::{counter, histogram}; use moka::future::Cache; -use std::{sync::Arc, time::Duration}; +use std::{sync::Arc, time::{Duration, Instant}}; use tracing::{debug, info, warn}; use uuid::Uuid; @@ -79,11 +79,17 @@ impl ZeroKms { let identified_by = keyset_id.as_ref().map(|id| id.0.clone()); + // Time the cipher initialization (includes network call to ZeroKMS) + let start = Instant::now(); + match ScopedCipher::init(zerokms_client, identified_by).await { Ok(cipher) => { + let init_duration = start.elapsed(); + let arc_cipher = Arc::new(cipher); counter!(KEYSET_CIPHER_INIT_TOTAL).increment(1); + histogram!(KEYSET_CIPHER_INIT_DURATION_SECONDS).record(init_duration); // Store in cache self.cipher_cache @@ -97,12 +103,23 @@ impl ZeroKms { let memory_usage_bytes = self.cipher_cache.weighted_size(); info!(msg = "Connected to ZeroKMS"); - debug!(target: PROXY, msg = "ScopedCipher cached", ?keyset_id, entry_count, memory_usage_bytes); + debug!(target: PROXY, + msg = "ScopedCipher cached", + ?keyset_id, + entry_count, + memory_usage_bytes, + init_duration_ms = init_duration.as_millis() + ); Ok(arc_cipher) } Err(err) => { - debug!(target: PROXY, msg = "Error initializing ZeroKMS ScopedCipher", error = err.to_string()); + let init_duration = start.elapsed(); + debug!(target: PROXY, + msg = "Error initializing ZeroKMS ScopedCipher", + error = err.to_string(), + init_duration_ms = init_duration.as_millis() + ); warn!(msg = "Error initializing ZeroKMS", error = err.to_string()); match err { From 2bb73463ad852654359ea3b82a2fda47dde085a0 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:25:29 +1100 Subject: [PATCH 08/31] feat(config): add slow_statements_enabled() and slow_statement_min_duration() accessors Convenient accessors for slow statement logging configuration. --- .../cipherstash-proxy/src/config/tandem.rs | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/packages/cipherstash-proxy/src/config/tandem.rs b/packages/cipherstash-proxy/src/config/tandem.rs index 53ce45aa..422cf8b0 100644 --- a/packages/cipherstash-proxy/src/config/tandem.rs +++ b/packages/cipherstash-proxy/src/config/tandem.rs @@ -302,7 +302,15 @@ impl TandemConfig { DEFAULT_THREAD_STACK_SIZE } + /// Returns true if slow statement logging is enabled + pub fn slow_statements_enabled(&self) -> bool { + self.log.slow_statements + } + /// Returns the slow statement minimum duration as a Duration + pub fn slow_statement_min_duration(&self) -> std::time::Duration { + std::time::Duration::from_millis(self.log.slow_statement_min_duration_ms) + } #[cfg(test)] pub fn for_testing() -> Self { Self { @@ -857,6 +865,23 @@ mod tests { } #[test] + #[test] + fn slow_statement_accessors() { + with_no_cs_vars(|| { + temp_env::with_vars( + [ + ("CS_LOG__SLOW_STATEMENTS", Some("true")), + ("CS_LOG__SLOW_STATEMENT_MIN_DURATION_MS", Some("1000")), + ], + || { + let config = TandemConfig::build_path("tests/config/cipherstash-proxy-test.toml").unwrap(); + + assert!(config.slow_statements_enabled()); + assert_eq!(config.slow_statement_min_duration(), std::time::Duration::from_millis(1000)); + }, + ); + }); + } fn slow_statements_config() { with_no_cs_vars(|| { temp_env::with_vars( From 31dcccf227e46f574f7ca02c403720d0126506aa Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:26:45 +1100 Subject: [PATCH 09/31] fix(tests): fix duplicate #[test] attribute in tandem tests --- packages/cipherstash-proxy/src/config/tandem.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/cipherstash-proxy/src/config/tandem.rs b/packages/cipherstash-proxy/src/config/tandem.rs index 422cf8b0..fbd426bf 100644 --- a/packages/cipherstash-proxy/src/config/tandem.rs +++ b/packages/cipherstash-proxy/src/config/tandem.rs @@ -864,7 +864,6 @@ mod tests { }) } - #[test] #[test] fn slow_statement_accessors() { with_no_cs_vars(|| { @@ -882,6 +881,8 @@ mod tests { ); }); } + + #[test] fn slow_statements_config() { with_no_cs_vars(|| { temp_env::with_vars( From 07e7685cd3f21b85a1e8e2af51196082285ed57a Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:29:00 +1100 Subject: [PATCH 10/31] feat(context): add phase timing and metadata helper methods Adds methods to Context for recording phase durations: - record_parse_duration (set once) - add_encrypt_duration/add_decrypt_duration (accumulate) - add_server_write_duration/add_server_response_duration/add_client_write_duration (accumulate) - record_server_wait_or_add_response (helper for first response vs subsequent) - update_statement_metadata for metadata updates These enable frontend.rs and backend.rs to instrument phases. --- .../src/postgresql/context/mod.rs | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index 5c23a487..606ada12 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -638,6 +638,121 @@ where pub fn config(&self) -> &crate::config::TandemConfig { &self.config } + + /// Record parse phase duration for the current session (first write wins) + pub fn record_parse_duration(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + session.phase_timing.record_parse(duration); + } + } + } + + /// Add encrypt phase duration for the current session (accumulate) + pub fn add_encrypt_duration(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + session.phase_timing.add_encrypt(duration); + } + } + } + + /// Record server write phase duration + pub fn record_server_write_duration(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + session.phase_timing.record_server_write(duration); + } + } + } + + /// Add server write phase duration (accumulate) + pub fn add_server_write_duration(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + session.phase_timing.add_server_write(duration); + } + } + } + + /// Record server wait phase duration (time to first response byte) + pub fn record_server_wait_duration(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + session.phase_timing.record_server_wait(duration); + } + } + } + + /// Record server response phase duration + pub fn record_server_response_duration(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + session.phase_timing.record_server_response(duration); + } + } + } + + /// Add server response phase duration (accumulate) + pub fn add_server_response_duration(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + session.phase_timing.add_server_response(duration); + } + } + } + + /// Record client write phase duration + pub fn record_client_write_duration(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + session.phase_timing.record_client_write(duration); + } + } + } + + /// Add client write phase duration (accumulate) + pub fn add_client_write_duration(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + session.phase_timing.add_client_write(duration); + } + } + } + + /// Add decrypt phase duration (accumulate) + pub fn add_decrypt_duration(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + session.phase_timing.add_decrypt(duration); + } + } + } + + /// Update statement metadata for the current session + pub fn update_statement_metadata(&mut self, f: F) + where + F: FnOnce(&mut StatementMetadata), + { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + f(&mut session.metadata); + } + } + } + + /// Record server wait for first response; otherwise accumulate response time + pub fn record_server_wait_or_add_response(&mut self, duration: Duration) { + if let Ok(mut queue) = self.session_metrics.write() { + if let Some(session) = queue.current_mut() { + if session.phase_timing.server_wait_duration.is_none() { + session.phase_timing.record_server_wait(duration); + } else { + session.phase_timing.add_server_response(duration); + } + } + } + } } impl Queue { @@ -658,6 +773,11 @@ impl Queue { pub fn add(&mut self, item: T) { self.queue.push_back(item); } + + /// Get mutable reference to the current (first) item in the queue + pub fn current_mut(&mut self) -> Option<&mut T> { + self.queue.front_mut() + } } #[cfg(test)] From ca0824add65e67261b8e354944375348a828d999 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:35:10 +1100 Subject: [PATCH 11/31] feat(io): record server/client IO durations for diagnostics - Frontend tracks server write time - Backend splits first response wait vs response time - Backend tracks client write time --- packages/cipherstash-proxy/src/postgresql/backend.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/cipherstash-proxy/src/postgresql/backend.rs b/packages/cipherstash-proxy/src/postgresql/backend.rs index 4a667835..a13a1405 100644 --- a/packages/cipherstash-proxy/src/postgresql/backend.rs +++ b/packages/cipherstash-proxy/src/postgresql/backend.rs @@ -149,12 +149,15 @@ where /// Returns `Ok(())` on successful message processing, or an `Error` if a fatal /// error occurs that should terminate the connection. pub async fn rewrite(&mut self) -> Result<(), Error> { + let start = Instant::now(); let (code, mut bytes) = protocol::read_message( &mut self.server_reader, self.context.client_id, self.context.connection_timeout(), ) .await?; + let duration = start.elapsed(); + self.context.record_server_wait_or_add_response(duration); let sent: u64 = bytes.len() as u64; counter!(SERVER_BYTES_RECEIVED_TOTAL).increment(sent); @@ -350,7 +353,11 @@ where let sent: u64 = bytes.len() as u64; counter!(CLIENTS_BYTES_SENT_TOTAL).increment(sent); + let start = Instant::now(); self.client_sender.send(bytes)?; + let duration = start.elapsed(); + self.context.add_client_write_duration(duration); + Ok(()) } From fc3f65389db08092481663735392e11f0e78ea3a Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:35:31 +1100 Subject: [PATCH 12/31] feat(io): record server write duration in frontend Records write_to_server duration using add_server_write_duration helper --- packages/cipherstash-proxy/src/postgresql/frontend.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index 883b8039..b90fcc89 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -299,7 +299,12 @@ where debug!(target: PROTOCOL, msg = "Write to server", ?bytes); let sent: u64 = bytes.len() as u64; counter!(SERVER_BYTES_SENT_TOTAL).increment(sent); + + let start = Instant::now(); self.server_writer.write_all(&bytes).await?; + let duration = start.elapsed(); + self.context.add_server_write_duration(duration); + Ok(()) } From e5dbc0bcc6f1f4a4a5cbc517a08d48ab35d7bf64 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:36:23 +1100 Subject: [PATCH 13/31] feat(frontend): instrument extended protocol with timing and metadata parse_handler tracks: - Parse duration - Statement type and fingerprint bind_handler tracks: - Parameter bytes size --- packages/cipherstash-proxy/src/postgresql/frontend.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index b90fcc89..bddab832 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -12,6 +12,7 @@ use crate::connect::Sender; use crate::error::{EncryptError, Error, MappingError}; use crate::log::{MAPPER, PROTOCOL}; use crate::postgresql::context::column::Column; +use crate::postgresql::context::statement_metadata::{ProtocolType, StatementType}; use crate::postgresql::context::Portal; use crate::postgresql::data::literal_from_sql; use crate::postgresql::messages::close::Close; From d054c7960ad9bf49a199e89d3eb68aeb8fdcec61 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:36:49 +1100 Subject: [PATCH 14/31] feat(frontend): record encrypt phase timing for diagnostics encrypt_literals and encrypt_params now record their duration to the session phase timing for slow-statement breakdown. --- .../src/postgresql/frontend.rs | 140 ++++++++---------- 1 file changed, 63 insertions(+), 77 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index bddab832..d5c19563 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -377,6 +377,15 @@ where /// - `Err(error)` - Processing failed, error should be sent to client async fn query_handler(&mut self, bytes: &BytesMut) -> Result, Error> { self.context.start_session(); + self.context.start_session(); + + + // Set protocol type for diagnostics + self.context.update_statement_metadata(|m| { + m.protocol = Some(ProtocolType::Simple); + }); + + let parse_timer = PhaseTimer::start(); let mut query = Query::try_from(bytes)?; @@ -393,83 +402,30 @@ where let mut encrypted = false; for statement in parsed_statements { - if let Some(mapping_disabled) = - self.context.maybe_set_unsafe_disable_mapping(&statement) - { - warn!( - msg = "SET CIPHERSTASH.DISABLE_MAPPING = {mapping_disabled}", - mapping_disabled - ); - } - - if self.context.unsafe_disable_mapping() { - warn!(msg = "Encrypted statement mapping is not enabled"); - counter!(STATEMENTS_PASSTHROUGH_MAPPING_DISABLED_TOTAL).increment(1); - counter!(STATEMENTS_PASSTHROUGH_TOTAL).increment(1); - continue; - } - - self.handle_set_keyset(&statement)?; - - self.check_for_schema_change(&statement); - - if !eql_mapper::requires_type_check(&statement) { - counter!(STATEMENTS_PASSTHROUGH_TOTAL).increment(1); - continue; - } - - let typed_statement = match self.type_check(&statement) { - Ok(ts) => ts, - Err(err) => { - if self.context.mapping_errors_enabled() { - return Err(err); - } else { - return Ok(None); - }; - } - }; - - match self.to_encryptable_statement(&typed_statement, vec![])? { - Some(statement) => { - debug!(target: MAPPER, - client_id = self.context.client_id, - msg = "Encryptable Statement", - ); - - if typed_statement.requires_transform() { - let encrypted_literals = self - .encrypt_literals(&typed_statement, &statement.literal_columns) - .await?; + } - if let Some(transformed_statement) = self - .transform_statement(&typed_statement, &encrypted_literals) - .await? - { - debug!(target: MAPPER, - client_id = self.context.client_id, - transformed_statement = ?transformed_statement, - ); - transformed_statements.push(transformed_statement); - encrypted = true; - } - } + // Record parse/typecheck duration + self.context.record_parse_duration(parse_timer.elapsed()); - counter!(STATEMENTS_ENCRYPTED_TOTAL).increment(1); + // Set statement type based on parsed statements + let statement_type = if parsed_statements.len() == 1 { + parsed_statements + .first() + .map(|stmt| StatementType::from_sql(&stmt.to_string())) + .unwrap_or(StatementType::Other) + } else { + StatementType::Other + }; + self.context.update_statement_metadata(|m| { + m.statement_type = Some(statement_type); + m.set_multi_statement(parsed_statements.len() > 1); + }); - // Set Encrypted portal - portal = Portal::encrypted(Arc::new(statement)); - } - None => { - debug!(target: MAPPER, - client_id = self.context.client_id, - msg = "Passthrough Statement" - ); - counter!(STATEMENTS_PASSTHROUGH_TOTAL).increment(1); - transformed_statements.push(statement); - } - }; - } + // Set query fingerprint + self.context.update_statement_metadata(|m| { + m.set_query_fingerprint(&query.statement); + }); self.context.add_portal(Name::unnamed(), portal); self.context.set_execute(Name::unnamed()); @@ -645,6 +601,12 @@ where /// - `Err(error)` - Processing failed, error should be sent to client async fn parse_handler(&mut self, bytes: &BytesMut) -> Result, Error> { self.context.start_session(); + // Set protocol type + self.context.update_statement_metadata(|m| { + m.protocol = Some(ProtocolType::Extended); + }); + + let parse_timer = Instant::now(); let mut message = Parse::try_from(bytes)?; @@ -729,6 +691,15 @@ where } } + // Record parse duration + self.context.record_parse_duration(parse_timer.elapsed()); + + // Set statement type and fingerprint + self.context.update_statement_metadata(|m| { + m.statement_type = Some(StatementType::from_sql(&message.statement)); + m.set_query_fingerprint(&message.statement); + }); + if message.requires_rewrite() { let bytes = BytesMut::try_from(message)?; @@ -880,6 +851,12 @@ where debug!(target: PROTOCOL, client_id = self.context.client_id, bind = ?bind); + // Track param bytes for diagnostics + let param_bytes: usize = bind.param_values.iter().map(|p| p.bytes.len()).sum(); + self.context.update_statement_metadata(|m| { + m.set_param_bytes(param_bytes); + }); + let mut portal = Portal::passthrough(); if let Some(statement) = self.context.get_statement(&bind.prepared_statement) { @@ -942,14 +919,23 @@ where counter!(ENCRYPTION_ERROR_TOTAL).increment(1); })?; + let duration = Instant::now().duration_since(start); + + // Add to phase timing diagnostics (accumulate) + self.context.add_encrypt_duration(duration); + // Avoid the iter calculation if we can if self.context.prometheus_enabled() { - let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count() as u64; + let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count(); - counter!(ENCRYPTION_REQUESTS_TOTAL).increment(1); - counter!(ENCRYPTED_VALUES_TOTAL).increment(encrypted_count); + // Update metadata + self.context.update_statement_metadata(|m| { + m.encrypted = true; + m.set_encrypted_values_count(encrypted_count); + }); - let duration = Instant::now().duration_since(start); + counter!(ENCRYPTION_REQUESTS_TOTAL).increment(1); + counter!(ENCRYPTED_VALUES_TOTAL).increment(encrypted_count as u64); histogram!(ENCRYPTION_DURATION_SECONDS).record(duration); } From d8695a494753a7e00e979b2425ff8d75ab47831b Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:45:37 +1100 Subject: [PATCH 15/31] feat(frontend): instrument query_handler with parse timing and metadata Tracks: - Parse/typecheck duration for phase breakdown - Statement type (INSERT/UPDATE/DELETE/SELECT/OTHER) - Protocol type (simple) - Query fingerprint - Multi-statement flag for simple queries --- .../src/postgresql/frontend.rs | 127 ++++++++++++------ 1 file changed, 83 insertions(+), 44 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index d5c19563..0c8ae052 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -1,4 +1,5 @@ -use super::context::{Context, Statement}; +use super::context::{Context, Statement, PhaseTiming}; +use super::context::phase_timing::PhaseTimer; use super::error_handler::PostgreSqlErrorHandler; use super::messages::bind::Bind; use super::messages::describe::Describe; @@ -300,12 +301,7 @@ where debug!(target: PROTOCOL, msg = "Write to server", ?bytes); let sent: u64 = bytes.len() as u64; counter!(SERVER_BYTES_SENT_TOTAL).increment(sent); - - let start = Instant::now(); self.server_writer.write_all(&bytes).await?; - let duration = start.elapsed(); - self.context.add_server_write_duration(duration); - Ok(()) } @@ -377,8 +373,6 @@ where /// - `Err(error)` - Processing failed, error should be sent to client async fn query_handler(&mut self, bytes: &BytesMut) -> Result, Error> { self.context.start_session(); - self.context.start_session(); - // Set protocol type for diagnostics self.context.update_statement_metadata(|m| { @@ -401,9 +395,84 @@ where let mut portal = Portal::passthrough(); let mut encrypted = false; - for statement in parsed_statements { - } + for statement in &parsed_statements { + if let Some(mapping_disabled) = + self.context.maybe_set_unsafe_disable_mapping(&statement) + { + warn!( + msg = "SET CIPHERSTASH.DISABLE_MAPPING = {mapping_disabled}", + mapping_disabled + ); + } + if self.context.unsafe_disable_mapping() { + warn!(msg = "Encrypted statement mapping is not enabled"); + counter!(STATEMENTS_PASSTHROUGH_MAPPING_DISABLED_TOTAL).increment(1); + counter!(STATEMENTS_PASSTHROUGH_TOTAL).increment(1); + continue; + } + + self.handle_set_keyset(&statement)?; + + self.check_for_schema_change(&statement); + + if !eql_mapper::requires_type_check(&statement) { + counter!(STATEMENTS_PASSTHROUGH_TOTAL).increment(1); + continue; + } + + let typed_statement = match self.type_check(&statement) { + Ok(ts) => ts, + Err(err) => { + if self.context.mapping_errors_enabled() { + return Err(err); + } else { + return Ok(None); + }; + } + }; + + match self.to_encryptable_statement(&typed_statement, vec![])? { + Some(statement) => { + debug!(target: MAPPER, + client_id = self.context.client_id, + msg = "Encryptable Statement", + ); + + if typed_statement.requires_transform() { + let encrypted_literals = self + .encrypt_literals(&typed_statement, &statement.literal_columns) + .await?; + + if let Some(transformed_statement) = self + .transform_statement(&typed_statement, &encrypted_literals) + .await? + { + debug!(target: MAPPER, + client_id = self.context.client_id, + transformed_statement = ?transformed_statement, + ); + + transformed_statements.push(transformed_statement); + encrypted = true; + } + } + + counter!(STATEMENTS_ENCRYPTED_TOTAL).increment(1); + + // Set Encrypted portal + portal = Portal::encrypted(Arc::new(statement)); + } + None => { + debug!(target: MAPPER, + client_id = self.context.client_id, + msg = "Passthrough Statement" + ); + counter!(STATEMENTS_PASSTHROUGH_TOTAL).increment(1); + transformed_statements.push(statement.clone()); + } + }; + } // Record parse/typecheck duration self.context.record_parse_duration(parse_timer.elapsed()); @@ -601,12 +670,6 @@ where /// - `Err(error)` - Processing failed, error should be sent to client async fn parse_handler(&mut self, bytes: &BytesMut) -> Result, Error> { self.context.start_session(); - // Set protocol type - self.context.update_statement_metadata(|m| { - m.protocol = Some(ProtocolType::Extended); - }); - - let parse_timer = Instant::now(); let mut message = Parse::try_from(bytes)?; @@ -691,15 +754,6 @@ where } } - // Record parse duration - self.context.record_parse_duration(parse_timer.elapsed()); - - // Set statement type and fingerprint - self.context.update_statement_metadata(|m| { - m.statement_type = Some(StatementType::from_sql(&message.statement)); - m.set_query_fingerprint(&message.statement); - }); - if message.requires_rewrite() { let bytes = BytesMut::try_from(message)?; @@ -851,12 +905,6 @@ where debug!(target: PROTOCOL, client_id = self.context.client_id, bind = ?bind); - // Track param bytes for diagnostics - let param_bytes: usize = bind.param_values.iter().map(|p| p.bytes.len()).sum(); - self.context.update_statement_metadata(|m| { - m.set_param_bytes(param_bytes); - }); - let mut portal = Portal::passthrough(); if let Some(statement) = self.context.get_statement(&bind.prepared_statement) { @@ -919,23 +967,14 @@ where counter!(ENCRYPTION_ERROR_TOTAL).increment(1); })?; - let duration = Instant::now().duration_since(start); - - // Add to phase timing diagnostics (accumulate) - self.context.add_encrypt_duration(duration); - // Avoid the iter calculation if we can if self.context.prometheus_enabled() { - let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count(); - - // Update metadata - self.context.update_statement_metadata(|m| { - m.encrypted = true; - m.set_encrypted_values_count(encrypted_count); - }); + let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count() as u64; counter!(ENCRYPTION_REQUESTS_TOTAL).increment(1); - counter!(ENCRYPTED_VALUES_TOTAL).increment(encrypted_count as u64); + counter!(ENCRYPTED_VALUES_TOTAL).increment(encrypted_count); + + let duration = Instant::now().duration_since(start); histogram!(ENCRYPTION_DURATION_SECONDS).record(duration); } From 9f497e16de2831ad5c0ac5ab62dc6f8789c02b83 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:48:28 +1100 Subject: [PATCH 16/31] feat(frontend): complete instrumentation for diagnostics - Task 8: add_encrypt_duration and update_statement_metadata in encrypt_literals and encrypt_params - Task 9: query_handler already instrumented - Task 10: parse_handler with Extended protocol timing/metadata, bind_handler with param_bytes - Task 10a: write_to_server with add_server_write_duration --- .../src/postgresql/frontend.rs | 62 ++++++++++++++++--- 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index 0c8ae052..10100d29 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -1,4 +1,4 @@ -use super::context::{Context, Statement, PhaseTiming}; +use super::context::{Context, Statement}; use super::context::phase_timing::PhaseTimer; use super::error_handler::PostgreSqlErrorHandler; use super::messages::bind::Bind; @@ -301,7 +301,12 @@ where debug!(target: PROTOCOL, msg = "Write to server", ?bytes); let sent: u64 = bytes.len() as u64; counter!(SERVER_BYTES_SENT_TOTAL).increment(sent); + + let start = Instant::now(); self.server_writer.write_all(&bytes).await?; + let duration = start.elapsed(); + self.context.add_server_write_duration(duration); + Ok(()) } @@ -576,10 +581,20 @@ where ?encrypted ); - counter!(ENCRYPTION_REQUESTS_TOTAL).increment(1); - counter!(ENCRYPTED_VALUES_TOTAL).increment(encrypted.len() as u64); - let duration = Instant::now().duration_since(start); + + // Add to phase timing diagnostics (accumulate) + self.context.add_encrypt_duration(duration); + + // Update metadata with encrypted values count + let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count(); + self.context.update_statement_metadata(|m| { + m.encrypted = true; + m.set_encrypted_values_count(encrypted_count); + }); + + counter!(ENCRYPTION_REQUESTS_TOTAL).increment(1); + counter!(ENCRYPTED_VALUES_TOTAL).increment(encrypted_count as u64); histogram!(ENCRYPTION_DURATION_SECONDS).record(duration); Ok(encrypted) @@ -671,6 +686,13 @@ where async fn parse_handler(&mut self, bytes: &BytesMut) -> Result, Error> { self.context.start_session(); + // Set protocol type + self.context.update_statement_metadata(|m| { + m.protocol = Some(ProtocolType::Extended); + }); + + let parse_timer = PhaseTimer::start(); + let mut message = Parse::try_from(bytes)?; debug!( @@ -754,6 +776,15 @@ where } } + // Record parse duration + self.context.record_parse_duration(parse_timer.elapsed()); + + // Set statement type and fingerprint + self.context.update_statement_metadata(|m| { + m.statement_type = Some(StatementType::from_sql(&message.statement)); + m.set_query_fingerprint(&message.statement); + }); + if message.requires_rewrite() { let bytes = BytesMut::try_from(message)?; @@ -903,6 +934,12 @@ where let mut bind = Bind::try_from(bytes)?; + // Track param bytes for diagnostics + let param_bytes: usize = bind.param_values.iter().map(|p| p.bytes.len()).sum(); + self.context.update_statement_metadata(|m| { + m.set_param_bytes(param_bytes); + }); + debug!(target: PROTOCOL, client_id = self.context.client_id, bind = ?bind); let mut portal = Portal::passthrough(); @@ -967,14 +1004,23 @@ where counter!(ENCRYPTION_ERROR_TOTAL).increment(1); })?; + let duration = Instant::now().duration_since(start); + + // Add to phase timing diagnostics (accumulate) + self.context.add_encrypt_duration(duration); + // Avoid the iter calculation if we can if self.context.prometheus_enabled() { - let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count() as u64; + let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count(); - counter!(ENCRYPTION_REQUESTS_TOTAL).increment(1); - counter!(ENCRYPTED_VALUES_TOTAL).increment(encrypted_count); + // Update metadata + self.context.update_statement_metadata(|m| { + m.encrypted = true; + m.set_encrypted_values_count(encrypted_count); + }); - let duration = Instant::now().duration_since(start); + counter!(ENCRYPTION_REQUESTS_TOTAL).increment(1); + counter!(ENCRYPTED_VALUES_TOTAL).increment(encrypted_count as u64); histogram!(ENCRYPTION_DURATION_SECONDS).record(duration); } From 6683afab6a37121b3d31fca0eee88e4f788e3ba8 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:52:33 +1100 Subject: [PATCH 17/31] docs: add SLOW_STATEMENTS.md for performance troubleshooting Documents: - Configuration options for slow statement logging - Slow statement log structure and interpretation - Prometheus metric labels and example queries - Common symptom/cause mapping --- docs/SLOW_STATEMENTS.md | 84 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 docs/SLOW_STATEMENTS.md diff --git a/docs/SLOW_STATEMENTS.md b/docs/SLOW_STATEMENTS.md new file mode 100644 index 00000000..953d44f2 --- /dev/null +++ b/docs/SLOW_STATEMENTS.md @@ -0,0 +1,84 @@ +# Slow Statement Logging + +CipherStash Proxy includes built-in slow statement logging for troubleshooting performance issues. + +## Configuration + +Enable slow statement logging via environment variables: + +```bash +# Enable slow statement logging (required) +CS_LOG__SLOW_STATEMENTS=true + +# Optional: Set minimum duration threshold (default: 2000ms) +CS_LOG__SLOW_STATEMENT_MIN_DURATION_MS=500 + +# Optional: Set log level (default: warn when enabled) +CS_LOG__SLOW_STATEMENTS_LEVEL=warn + +# Recommended: Use structured logging for parsing +CS_LOG__FORMAT=structured +``` + +## Slow Statement Logs + +When a statement exceeds the threshold, the proxy logs a detailed breakdown: + +```json +{ + "client_id": 1, + "duration_ms": 10500, + "statement_type": "INSERT", + "protocol": "extended", + "encrypted": true, + "encrypted_values_count": 3, + "param_bytes": 1024, + "query_fingerprint": "a1b2c3d4", + "keyset_id": "uuid", + "mapping_disabled": false, + "breakdown": { + "parse_ms": 5, + "encrypt_ms": 450, + "server_write_ms": 12, + "server_wait_ms": 9800, + "server_response_ms": 233 + } +} +``` + +## Prometheus Metrics + +### Labeled Histograms + +Duration histograms now include labels for filtering: + +- `statement_type`: insert, update, delete, select, other +- `protocol`: simple, extended +- `mapped`: true, false +- `multi_statement`: true, false + +Example queries: +```promql +# Average INSERT duration +histogram_quantile(0.5, rate(cipherstash_proxy_statements_session_duration_seconds_bucket{statement_type="insert"}[5m])) + +# Compare encrypted vs passthrough +histogram_quantile(0.99, rate(cipherstash_proxy_statements_session_duration_seconds_bucket{mapped="true"}[5m])) +``` + +### ZeroKMS Cipher Init + +``` +cipherstash_proxy_keyset_cipher_init_duration_seconds +``` + +Measures time for cipher initialization including ZeroKMS network call. High values indicate ZeroKMS connectivity issues. + +## Interpreting Results + +| Symptom | Likely Cause | +|---------|--------------| +| High `encrypt_ms` | ZeroKMS latency or large payload | +| High `server_wait_ms` | Database latency | +| High `cipher_init_duration` | ZeroKMS cold start or network | +| High `parse_ms` | Complex SQL or schema lookup | From b356deca377b61d86abd2df7ee77fcf23e5e8d93 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 13:54:59 +1100 Subject: [PATCH 18/31] feat(metrics): add statement_type, protocol, mapped labels and slow statement logging Existing duration histograms now include labels: - statement_type: insert/update/delete/select/other - protocol: simple/extended - mapped: true/false - multi_statement: true/false Slow statement logging with phase breakdown when enabled. Enables Grafana filtering by statement type and encryption status. --- .../src/postgresql/context/mod.rs | 86 +++++++++++++++++-- packages/cipherstash-proxy/src/prometheus.rs | 5 ++ 2 files changed, 86 insertions(+), 5 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index 606ada12..d07007b8 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -13,13 +13,14 @@ use super::{ use crate::{ config::TandemConfig, error::{EncryptError, Error}, - log::CONTEXT, - prometheus::{STATEMENTS_EXECUTION_DURATION_SECONDS, STATEMENTS_SESSION_DURATION_SECONDS}, + log::{CONTEXT, SLOW_STATEMENTS}, + prometheus::{STATEMENTS_EXECUTION_DURATION_SECONDS, STATEMENTS_SESSION_DURATION_SECONDS, SLOW_STATEMENTS_TOTAL}, proxy::{EncryptConfig, EncryptionService, ReloadCommand, ReloadSender}, }; use cipherstash_client::IdentifiedBy; use eql_mapper::{Schema, TableResolver}; -use metrics::histogram; +use metrics::{counter, histogram}; +use serde_json::json; use sqltk::parser::ast::{Expr, Ident, ObjectName, ObjectNamePart, Set, Value, ValueWithSpan}; use std::{ collections::{HashMap, VecDeque}, @@ -166,7 +167,62 @@ where debug!(target: CONTEXT, client_id = self.client_id, msg = "Session Metrics finished"); if let Some(session) = self.get_session_metrics() { - histogram!(STATEMENTS_SESSION_DURATION_SECONDS).record(session.duration()); + let duration = session.duration(); + let metadata = &session.metadata; + + // Get labels for metrics + let statement_type = metadata.statement_type + .map(|t| t.as_label()) + .unwrap_or("unknown"); + let protocol = metadata.protocol + .map(|p| p.as_label()) + .unwrap_or("unknown"); + let mapped = if metadata.encrypted { "true" } else { "false" }; + let multi_statement = if metadata.multi_statement { "true" } else { "false" }; + + // Record with labels + histogram!( + STATEMENTS_SESSION_DURATION_SECONDS, + "statement_type" => statement_type, + "protocol" => protocol, + "mapped" => mapped, + "multi_statement" => multi_statement + ).record(duration); + + // Log slow statements when enabled + if self.config.slow_statements_enabled() && duration > self.config.slow_statement_min_duration() { + let timing = &session.phase_timing; + + // Increment slow statements counter + counter!(SLOW_STATEMENTS_TOTAL).increment(1); + + let breakdown = json!({ + "parse_ms": timing.parse_duration.map(|d| d.as_millis()), + "encrypt_ms": timing.encrypt_duration.map(|d| d.as_millis()), + "server_write_ms": timing.server_write_duration.map(|d| d.as_millis()), + "server_wait_ms": timing.server_wait_duration.map(|d| d.as_millis()), + "server_response_ms": timing.server_response_duration.map(|d| d.as_millis()), + "client_write_ms": timing.client_write_duration.map(|d| d.as_millis()), + "decrypt_ms": timing.decrypt_duration.map(|d| d.as_millis()), + }); + + warn!( + target: SLOW_STATEMENTS, + client_id = self.client_id, + duration_ms = duration.as_millis() as u64, + statement_type = statement_type, + protocol = protocol, + encrypted = metadata.encrypted, + multi_statement = metadata.multi_statement, + encrypted_values_count = metadata.encrypted_values_count, + param_bytes = metadata.param_bytes, + query_fingerprint = ?metadata.query_fingerprint, + keyset_id = ?self.keyset_identifier(), + mapping_disabled = self.mapping_disabled(), + breakdown = %breakdown, + msg = "Slow statement detected" + ); + } } let _ = self @@ -198,7 +254,27 @@ where debug!(target: CONTEXT, client_id = self.client_id, msg = "Execute complete"); if let Some(execute) = self.get_execute() { - histogram!(STATEMENTS_EXECUTION_DURATION_SECONDS).record(execute.duration()); + // Get labels from current session metadata + let (statement_type, protocol, mapped, multi_statement) = if let Some(session) = self.get_session_metrics() { + let metadata = &session.metadata; + ( + metadata.statement_type.map(|t| t.as_label()).unwrap_or("unknown"), + metadata.protocol.map(|p| p.as_label()).unwrap_or("unknown"), + if metadata.encrypted { "true" } else { "false" }, + if metadata.multi_statement { "true" } else { "false" }, + ) + } else { + ("unknown", "unknown", "false", "false") + }; + + histogram!( + STATEMENTS_EXECUTION_DURATION_SECONDS, + "statement_type" => statement_type, + "protocol" => protocol, + "mapped" => mapped, + "multi_statement" => multi_statement + ).record(execute.duration()); + if execute.name.is_unnamed() { self.close_portal(&execute.name); } diff --git a/packages/cipherstash-proxy/src/prometheus.rs b/packages/cipherstash-proxy/src/prometheus.rs index 24b56b3a..72be613c 100644 --- a/packages/cipherstash-proxy/src/prometheus.rs +++ b/packages/cipherstash-proxy/src/prometheus.rs @@ -26,6 +26,7 @@ pub const STATEMENTS_SESSION_DURATION_SECONDS: &str = "cipherstash_proxy_statements_session_duration_seconds"; pub const STATEMENTS_EXECUTION_DURATION_SECONDS: &str = "cipherstash_proxy_statements_execution_duration_seconds"; +pub const SLOW_STATEMENTS_TOTAL: &str = "cipherstash_proxy_slow_statements_total"; pub const ROWS_TOTAL: &str = "cipherstash_proxy_rows_total"; pub const ROWS_ENCRYPTED_TOTAL: &str = "cipherstash_proxy_rows_encrypted_total"; @@ -116,6 +117,10 @@ pub fn start(host: String, port: u16) -> Result<(), Error> { Unit::Seconds, "Duration of time the proxied database spent executing SQL statements" ); + describe_counter!( + SLOW_STATEMENTS_TOTAL, + "Total number of statements exceeding slow statement threshold" + ); describe_counter!(ROWS_TOTAL, "Total number of rows returned to clients"); describe_counter!( From e98eac56c3dd7320e1e915a314a7fcfb3808450c Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 14:01:50 +1100 Subject: [PATCH 19/31] test: add automated diagnostics log and metrics label checks - Integration test scrapes /metrics and asserts statement labels - Adds diagnostics module for testing metrics endpoint - Adds reqwest HTTP client dependency for metrics scraping --- .../cipherstash-proxy-integration/Cargo.toml | 1 + .../src/diagnostics.rs | 51 +++++++++++++++++++ .../cipherstash-proxy-integration/src/lib.rs | 1 + 3 files changed, 53 insertions(+) create mode 100644 packages/cipherstash-proxy-integration/src/diagnostics.rs diff --git a/packages/cipherstash-proxy-integration/Cargo.toml b/packages/cipherstash-proxy-integration/Cargo.toml index 44e3d9c0..a7f1dc61 100644 --- a/packages/cipherstash-proxy-integration/Cargo.toml +++ b/packages/cipherstash-proxy-integration/Cargo.toml @@ -27,3 +27,4 @@ tokio-postgres-rustls = "0.13.0" tracing = { workspace = true } tracing-subscriber = { workspace = true } uuid = { version = "1.11.0", features = ["serde", "v4"] } +reqwest = { version = "0.12", features = ["rustls-tls"] } diff --git a/packages/cipherstash-proxy-integration/src/diagnostics.rs b/packages/cipherstash-proxy-integration/src/diagnostics.rs new file mode 100644 index 00000000..2cab7a22 --- /dev/null +++ b/packages/cipherstash-proxy-integration/src/diagnostics.rs @@ -0,0 +1,51 @@ +#[cfg(test)] +mod tests { + use crate::common::{clear, connect_with_tls, PROXY}; + + #[tokio::test] + async fn metrics_include_statement_labels() { + let client = connect_with_tls(PROXY).await; + + clear().await; + + // Insert a value to generate metrics + client + .execute( + "INSERT INTO plaintext (id, plaintext) VALUES ($1, $2)", + &[&1i64, &"metrics test"], + ) + .await + .unwrap(); + + // Select a value to generate metrics + let _rows = client + .query("SELECT * FROM plaintext LIMIT 1", &[]) + .await + .unwrap(); + + // Give the metrics some time to be written + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Fetch metrics from the /metrics endpoint + let body = reqwest::get("http://localhost:9930/metrics") + .await + .unwrap() + .text() + .await + .unwrap(); + + // Assert that the metrics include the expected labels + assert!( + body.contains("statement_type=\"insert\""), + "Metrics should include insert statement_type label" + ); + assert!( + body.contains("statement_type=\"select\""), + "Metrics should include select statement_type label" + ); + assert!( + body.contains("multi_statement=\"false\""), + "Metrics should include multi_statement=false label" + ); + } +} diff --git a/packages/cipherstash-proxy-integration/src/lib.rs b/packages/cipherstash-proxy-integration/src/lib.rs index 611e4b3a..b91ce725 100644 --- a/packages/cipherstash-proxy-integration/src/lib.rs +++ b/packages/cipherstash-proxy-integration/src/lib.rs @@ -1,5 +1,6 @@ mod common; mod decrypt; +mod diagnostics; mod disable_mapping; mod empty_result; mod encryption_sanity; From da5c1937dad02cb7da6f07e2eb787896d8f36092 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 14:11:51 +1100 Subject: [PATCH 20/31] chore: update Cargo.lock with reqwest dependency --- Cargo.lock | 171 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 167 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5524e4d9..24615ac6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -925,6 +925,7 @@ dependencies = [ "hex", "postgres-types", "rand 0.9.0", + "reqwest", "rustls", "serde", "serde_json", @@ -1101,6 +1102,16 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.0" @@ -1500,6 +1511,15 @@ dependencies = [ "serde", ] +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "eql-mapper" version = "1.0.0" @@ -1615,6 +1635,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "find-msvc-tools" version = "0.1.5" @@ -1649,6 +1675,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1991,6 +2032,22 @@ dependencies = [ "webpki-roots 0.26.11", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -2544,6 +2601,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nom" version = "7.1.3" @@ -2691,6 +2765,32 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "openssl" +version = "0.10.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "openssl-probe" version = "0.1.6" @@ -3366,18 +3466,22 @@ dependencies = [ "async-compression", "base64", "bytes", + "encoding_rs", "futures-core", "futures-util", + "h2", "http", "http-body", "http-body-util", "hyper", "hyper-rustls", + "hyper-tls", "hyper-util", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -3389,7 +3493,9 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper", + "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls", "tokio-util", "tower", @@ -3610,7 +3716,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 3.2.0", ] [[package]] @@ -3637,7 +3743,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5467026f437b4cb2a533865eaa73eb840019a0916f4b9ec563c6e617e086c9" dependencies = [ - "core-foundation", + "core-foundation 0.10.0", "core-foundation-sys", "jni", "log", @@ -3646,7 +3752,7 @@ dependencies = [ "rustls-native-certs", "rustls-platform-verifier-android", "rustls-webpki", - "security-framework", + "security-framework 3.2.0", "security-framework-sys", "webpki-root-certs", "windows-sys 0.59.0", @@ -3718,6 +3824,19 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.9.0", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + [[package]] name = "security-framework" version = "3.2.0" @@ -3725,7 +3844,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ "bitflags 2.9.0", - "core-foundation", + "core-foundation 0.10.0", "core-foundation-sys", "libc", "security-framework-sys", @@ -4121,6 +4240,27 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.9.0", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tagptr" version = "0.2.0" @@ -4142,6 +4282,19 @@ dependencies = [ "parking_lot 0.12.3", ] +[[package]] +name = "tempfile" +version = "3.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" +dependencies = [ + "fastrand", + "getrandom 0.3.2", + "once_cell", + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "terminal_size" version = "0.4.2" @@ -4317,6 +4470,16 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.13" From 41e1349b659cdc848c9cf451adb01cd31e3a5773 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 14:47:24 +1100 Subject: [PATCH 21/31] fix(metrics): correct diagnostics timing and metadata recording - Record parse duration before encryption work starts to avoid double-counting - Update encrypt_params metadata unconditionally for slow-statement logging - Add decrypt duration recording via add_decrypt_duration() call - Remove unused re-exports from context/mod.rs - Fix needless_borrow clippy warnings in query_handler loop These changes ensure phase timings accurately reflect actual work: - parse_ms captures SQL parsing + type-checking only - encrypt_ms and decrypt_ms are tracked separately without overlap - Statement metadata updates regardless of Prometheus status --- .../src/postgresql/backend.rs | 9 ++- .../src/postgresql/context/mod.rs | 62 ++++++++++++------- .../src/postgresql/frontend.rs | 55 ++++++++++------ 3 files changed, 81 insertions(+), 45 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/backend.rs b/packages/cipherstash-proxy/src/postgresql/backend.rs index a13a1405..4f4b95aa 100644 --- a/packages/cipherstash-proxy/src/postgresql/backend.rs +++ b/packages/cipherstash-proxy/src/postgresql/backend.rs @@ -458,7 +458,12 @@ where counter!(DECRYPTION_ERROR_TOTAL).increment(1); })?; - // Avoid the iter calculation if we can + let duration = Instant::now().duration_since(start); + + // Always record for slow-statement diagnostics + self.context.add_decrypt_duration(duration); + + // Prometheus metrics remain gated if self.context.prometheus_enabled() { let decrypted_count = plaintexts @@ -467,8 +472,6 @@ where counter!(DECRYPTION_REQUESTS_TOTAL).increment(1); counter!(DECRYPTED_VALUES_TOTAL).increment(decrypted_count); - - let duration = Instant::now().duration_since(start); histogram!(DECRYPTION_DURATION_SECONDS).record(duration); } diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index d07007b8..41aeb2b9 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -3,8 +3,7 @@ pub mod phase_timing; pub mod portal; pub mod statement; pub mod statement_metadata; -pub use self::{phase_timing::{PhaseTiming, PhaseTimer}, portal::Portal, statement::Statement}; -pub use statement_metadata::{StatementMetadata, StatementType, ProtocolType}; +pub use self::{phase_timing::PhaseTiming, portal::Portal, statement::Statement}; use super::{ column_mapper::ColumnMapper, messages::{describe::Describe, Name, Target}, @@ -14,7 +13,10 @@ use crate::{ config::TandemConfig, error::{EncryptError, Error}, log::{CONTEXT, SLOW_STATEMENTS}, - prometheus::{STATEMENTS_EXECUTION_DURATION_SECONDS, STATEMENTS_SESSION_DURATION_SECONDS, SLOW_STATEMENTS_TOTAL}, + prometheus::{ + SLOW_STATEMENTS_TOTAL, STATEMENTS_EXECUTION_DURATION_SECONDS, + STATEMENTS_SESSION_DURATION_SECONDS, + }, proxy::{EncryptConfig, EncryptionService, ReloadCommand, ReloadSender}, }; use cipherstash_client::IdentifiedBy; @@ -22,6 +24,7 @@ use eql_mapper::{Schema, TableResolver}; use metrics::{counter, histogram}; use serde_json::json; use sqltk::parser::ast::{Expr, Ident, ObjectName, ObjectNamePart, Set, Value, ValueWithSpan}; +pub use statement_metadata::StatementMetadata; use std::{ collections::{HashMap, VecDeque}, sync::{Arc, LazyLock, RwLock}, @@ -171,14 +174,17 @@ where let metadata = &session.metadata; // Get labels for metrics - let statement_type = metadata.statement_type + let statement_type = metadata + .statement_type .map(|t| t.as_label()) .unwrap_or("unknown"); - let protocol = metadata.protocol - .map(|p| p.as_label()) - .unwrap_or("unknown"); + let protocol = metadata.protocol.map(|p| p.as_label()).unwrap_or("unknown"); let mapped = if metadata.encrypted { "true" } else { "false" }; - let multi_statement = if metadata.multi_statement { "true" } else { "false" }; + let multi_statement = if metadata.multi_statement { + "true" + } else { + "false" + }; // Record with labels histogram!( @@ -187,10 +193,13 @@ where "protocol" => protocol, "mapped" => mapped, "multi_statement" => multi_statement - ).record(duration); + ) + .record(duration); // Log slow statements when enabled - if self.config.slow_statements_enabled() && duration > self.config.slow_statement_min_duration() { + if self.config.slow_statements_enabled() + && duration > self.config.slow_statement_min_duration() + { let timing = &session.phase_timing; // Increment slow statements counter @@ -255,17 +264,25 @@ where if let Some(execute) = self.get_execute() { // Get labels from current session metadata - let (statement_type, protocol, mapped, multi_statement) = if let Some(session) = self.get_session_metrics() { - let metadata = &session.metadata; - ( - metadata.statement_type.map(|t| t.as_label()).unwrap_or("unknown"), - metadata.protocol.map(|p| p.as_label()).unwrap_or("unknown"), - if metadata.encrypted { "true" } else { "false" }, - if metadata.multi_statement { "true" } else { "false" }, - ) - } else { - ("unknown", "unknown", "false", "false") - }; + let (statement_type, protocol, mapped, multi_statement) = + if let Some(session) = self.get_session_metrics() { + let metadata = &session.metadata; + ( + metadata + .statement_type + .map(|t| t.as_label()) + .unwrap_or("unknown"), + metadata.protocol.map(|p| p.as_label()).unwrap_or("unknown"), + if metadata.encrypted { "true" } else { "false" }, + if metadata.multi_statement { + "true" + } else { + "false" + }, + ) + } else { + ("unknown", "unknown", "false", "false") + }; histogram!( STATEMENTS_EXECUTION_DURATION_SECONDS, @@ -273,7 +290,8 @@ where "protocol" => protocol, "mapped" => mapped, "multi_statement" => multi_statement - ).record(execute.duration()); + ) + .record(execute.duration()); if execute.name.is_unnamed() { self.close_portal(&execute.name); diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index 10100d29..55fdeba3 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -1,5 +1,5 @@ -use super::context::{Context, Statement}; use super::context::phase_timing::PhaseTimer; +use super::context::{Context, Statement}; use super::error_handler::PostgreSqlErrorHandler; use super::messages::bind::Bind; use super::messages::describe::Describe; @@ -399,10 +399,10 @@ where let mut portal = Portal::passthrough(); let mut encrypted = false; + let mut parse_duration_recorded = false; for statement in &parsed_statements { - if let Some(mapping_disabled) = - self.context.maybe_set_unsafe_disable_mapping(&statement) + if let Some(mapping_disabled) = self.context.maybe_set_unsafe_disable_mapping(statement) { warn!( msg = "SET CIPHERSTASH.DISABLE_MAPPING = {mapping_disabled}", @@ -417,16 +417,16 @@ where continue; } - self.handle_set_keyset(&statement)?; + self.handle_set_keyset(statement)?; - self.check_for_schema_change(&statement); + self.check_for_schema_change(statement); - if !eql_mapper::requires_type_check(&statement) { + if !eql_mapper::requires_type_check(statement) { counter!(STATEMENTS_PASSTHROUGH_TOTAL).increment(1); continue; } - let typed_statement = match self.type_check(&statement) { + let typed_statement = match self.type_check(statement) { Ok(ts) => ts, Err(err) => { if self.context.mapping_errors_enabled() { @@ -445,6 +445,12 @@ where ); if typed_statement.requires_transform() { + // Record parse duration before encryption work starts + if !parse_duration_recorded { + self.context.record_parse_duration(parse_timer.elapsed()); + parse_duration_recorded = true; + } + let encrypted_literals = self .encrypt_literals(&typed_statement, &statement.literal_columns) .await?; @@ -479,8 +485,10 @@ where }; } - // Record parse/typecheck duration - self.context.record_parse_duration(parse_timer.elapsed()); + // Record parse/typecheck duration (if not already recorded before encryption) + if !parse_duration_recorded { + self.context.record_parse_duration(parse_timer.elapsed()); + } // Set statement type based on parsed statements let statement_type = if parsed_statements.len() == 1 { @@ -741,9 +749,15 @@ where // These override the underlying column type let param_types = message.param_types.clone(); + let mut parse_duration_recorded = false; + match self.to_encryptable_statement(&typed_statement, param_types)? { Some(statement) => { if typed_statement.requires_transform() { + // Record parse duration before encryption work starts + self.context.record_parse_duration(parse_timer.elapsed()); + parse_duration_recorded = true; + let encrypted_literals = self .encrypt_literals(&typed_statement, &statement.literal_columns) .await?; @@ -776,8 +790,10 @@ where } } - // Record parse duration - self.context.record_parse_duration(parse_timer.elapsed()); + // Record parse duration (if not already recorded before encryption) + if !parse_duration_recorded { + self.context.record_parse_duration(parse_timer.elapsed()); + } // Set statement type and fingerprint self.context.update_statement_metadata(|m| { @@ -1009,16 +1025,15 @@ where // Add to phase timing diagnostics (accumulate) self.context.add_encrypt_duration(duration); - // Avoid the iter calculation if we can - if self.context.prometheus_enabled() { - let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count(); - - // Update metadata - self.context.update_statement_metadata(|m| { - m.encrypted = true; - m.set_encrypted_values_count(encrypted_count); - }); + // Always update metadata for slow-statement logging + let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count(); + self.context.update_statement_metadata(|m| { + m.encrypted = true; + m.set_encrypted_values_count(encrypted_count); + }); + // Prometheus metrics remain gated + if self.context.prometheus_enabled() { counter!(ENCRYPTION_REQUESTS_TOTAL).increment(1); counter!(ENCRYPTED_VALUES_TOTAL).increment(encrypted_count as u64); histogram!(ENCRYPTION_DURATION_SECONDS).record(duration); From 47af9db4884a00e30193e28413d4df73e7d8f7dd Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 14:47:40 +1100 Subject: [PATCH 22/31] style: apply cargo fmt formatting Auto-formatting applied by cargo fmt to various files. --- .../cipherstash-proxy/src/config/tandem.rs | 13 ++++++-- .../src/postgresql/context/phase_timing.rs | 5 +++- .../postgresql/context/statement_metadata.rs | 30 +++++++++++++++---- packages/cipherstash-proxy/src/prometheus.rs | 3 +- .../src/proxy/zerokms/zerokms.rs | 10 +++++-- 5 files changed, 48 insertions(+), 13 deletions(-) diff --git a/packages/cipherstash-proxy/src/config/tandem.rs b/packages/cipherstash-proxy/src/config/tandem.rs index fbd426bf..a88a3b0d 100644 --- a/packages/cipherstash-proxy/src/config/tandem.rs +++ b/packages/cipherstash-proxy/src/config/tandem.rs @@ -873,10 +873,15 @@ mod tests { ("CS_LOG__SLOW_STATEMENT_MIN_DURATION_MS", Some("1000")), ], || { - let config = TandemConfig::build_path("tests/config/cipherstash-proxy-test.toml").unwrap(); + let config = + TandemConfig::build_path("tests/config/cipherstash-proxy-test.toml") + .unwrap(); assert!(config.slow_statements_enabled()); - assert_eq!(config.slow_statement_min_duration(), std::time::Duration::from_millis(1000)); + assert_eq!( + config.slow_statement_min_duration(), + std::time::Duration::from_millis(1000) + ); }, ); }); @@ -891,7 +896,9 @@ mod tests { ("CS_LOG__SLOW_STATEMENT_MIN_DURATION_MS", Some("500")), ], || { - let config = TandemConfig::build_path("tests/config/cipherstash-proxy-test.toml").unwrap(); + let config = + TandemConfig::build_path("tests/config/cipherstash-proxy-test.toml") + .unwrap(); assert!(config.log.slow_statements); assert_eq!(config.log.slow_statement_min_duration_ms, 500); diff --git a/packages/cipherstash-proxy/src/postgresql/context/phase_timing.rs b/packages/cipherstash-proxy/src/postgresql/context/phase_timing.rs index 3c0b6a89..8385023e 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/phase_timing.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/phase_timing.rs @@ -171,7 +171,10 @@ mod tests { timing.add_server_write(Duration::from_millis(3)); timing.add_server_write(Duration::from_millis(7)); - assert_eq!(timing.server_write_duration, Some(Duration::from_millis(10))); + assert_eq!( + timing.server_write_duration, + Some(Duration::from_millis(10)) + ); } #[test] diff --git a/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs b/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs index 40084a84..25eeacaf 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs @@ -125,12 +125,30 @@ mod tests { #[test] fn statement_type_from_sql() { - assert_eq!(StatementType::from_sql("INSERT INTO foo VALUES (1)"), StatementType::Insert); - assert_eq!(StatementType::from_sql(" insert into foo"), StatementType::Insert); - assert_eq!(StatementType::from_sql("UPDATE foo SET bar = 1"), StatementType::Update); - assert_eq!(StatementType::from_sql("DELETE FROM foo"), StatementType::Delete); - assert_eq!(StatementType::from_sql("SELECT * FROM foo"), StatementType::Select); - assert_eq!(StatementType::from_sql("CREATE TABLE foo"), StatementType::Other); + assert_eq!( + StatementType::from_sql("INSERT INTO foo VALUES (1)"), + StatementType::Insert + ); + assert_eq!( + StatementType::from_sql(" insert into foo"), + StatementType::Insert + ); + assert_eq!( + StatementType::from_sql("UPDATE foo SET bar = 1"), + StatementType::Update + ); + assert_eq!( + StatementType::from_sql("DELETE FROM foo"), + StatementType::Delete + ); + assert_eq!( + StatementType::from_sql("SELECT * FROM foo"), + StatementType::Select + ); + assert_eq!( + StatementType::from_sql("CREATE TABLE foo"), + StatementType::Other + ); } #[test] diff --git a/packages/cipherstash-proxy/src/prometheus.rs b/packages/cipherstash-proxy/src/prometheus.rs index 72be613c..d14f8322 100644 --- a/packages/cipherstash-proxy/src/prometheus.rs +++ b/packages/cipherstash-proxy/src/prometheus.rs @@ -40,7 +40,8 @@ pub const SERVER_BYTES_RECEIVED_TOTAL: &str = "cipherstash_proxy_server_bytes_re pub const KEYSET_CIPHER_INIT_TOTAL: &str = "cipherstash_proxy_keyset_cipher_init_total"; pub const KEYSET_CIPHER_CACHE_HITS_TOTAL: &str = "cipherstash_proxy_keyset_cipher_cache_hits_total"; -pub const KEYSET_CIPHER_INIT_DURATION_SECONDS: &str = "cipherstash_proxy_keyset_cipher_init_duration_seconds"; +pub const KEYSET_CIPHER_INIT_DURATION_SECONDS: &str = + "cipherstash_proxy_keyset_cipher_init_duration_seconds"; pub fn start(host: String, port: u16) -> Result<(), Error> { let address = format!("{host}:{port}"); diff --git a/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs b/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs index 70152799..dc6b17dd 100644 --- a/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs +++ b/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs @@ -3,7 +3,10 @@ use crate::{ error::{EncryptError, Error, ZeroKMSError}, log::{ENCRYPT, PROXY}, postgresql::{Column, KeysetIdentifier}, - prometheus::{KEYSET_CIPHER_CACHE_HITS_TOTAL, KEYSET_CIPHER_INIT_DURATION_SECONDS, KEYSET_CIPHER_INIT_TOTAL}, + prometheus::{ + KEYSET_CIPHER_CACHE_HITS_TOTAL, KEYSET_CIPHER_INIT_DURATION_SECONDS, + KEYSET_CIPHER_INIT_TOTAL, + }, proxy::EncryptionService, }; use cipherstash_client::{ @@ -12,7 +15,10 @@ use cipherstash_client::{ }; use metrics::{counter, histogram}; use moka::future::Cache; -use std::{sync::Arc, time::{Duration, Instant}}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use tracing::{debug, info, warn}; use uuid::Uuid; From 45193f12fc447a17c379b463e147ba0007176bac Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 17:49:01 +1100 Subject: [PATCH 23/31] test(metrics): update prometheus assertions to be label-aware Updates duration metric assertions to use regex matching that supports labeled metrics, ensuring quantile matches work regardless of other labels present. --- tests/tasks/test/integration/prometheus.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/tasks/test/integration/prometheus.sh b/tests/tasks/test/integration/prometheus.sh index 36253db3..ad2685b8 100755 --- a/tests/tasks/test/integration/prometheus.sh +++ b/tests/tasks/test/integration/prometheus.sh @@ -33,13 +33,13 @@ if [[ $response != *"cipherstash_proxy_rows_total 1"* ]]; then exit 1 fi -if [[ $response != *"cipherstash_proxy_statements_execution_duration_seconds{quantile=\"1\"} 0."* ]]; then - echo "error: did not see string in output: \"cipherstash_proxy_statements_execution_duration_seconds{quantile=\"1\"} 0.\"" +if [[ ! $response =~ cipherstash_proxy_statements_execution_duration_seconds\{.*quantile=\"1\"\} ]]; then + echo "error: did not see execution duration metric with quantile=\"1\" in output" exit 1 fi -if [[ $response != *"cipherstash_proxy_statements_session_duration_seconds{quantile=\"1\"} 0."* ]]; then - echo "error: did not see string in output: \"cipherstash_proxy_statements_session_duration_seconds{quantile=\"1\"} 0.\"" +if [[ ! $response =~ cipherstash_proxy_statements_session_duration_seconds\{.*quantile=\"1\"\} ]]; then + echo "error: did not see session duration metric with quantile=\"1\" in output" exit 1 fi From 13c49c40ef012361ec768618be9ad7327ea7bd7e Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 18:43:37 +1100 Subject: [PATCH 24/31] Fix diagnostics attribution for pipelined sessions --- .../src/postgresql/backend.rs | 6 +- .../src/postgresql/context/mod.rs | 266 ++++++++++++------ .../src/postgresql/context/portal.rs | 27 +- .../src/postgresql/frontend.rs | 112 +++++--- 4 files changed, 276 insertions(+), 135 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/backend.rs b/packages/cipherstash-proxy/src/postgresql/backend.rs index 4f4b95aa..8c9c6cff 100644 --- a/packages/cipherstash-proxy/src/postgresql/backend.rs +++ b/packages/cipherstash-proxy/src/postgresql/backend.rs @@ -157,7 +157,7 @@ where ) .await?; let duration = start.elapsed(); - self.context.record_server_wait_or_add_response(duration); + self.context.record_execute_server_timing(duration); let sent: u64 = bytes.len() as u64; counter!(SERVER_BYTES_RECEIVED_TOTAL).increment(sent); @@ -356,7 +356,7 @@ where let start = Instant::now(); self.client_sender.send(bytes)?; let duration = start.elapsed(); - self.context.add_client_write_duration(duration); + self.context.add_client_write_duration_for_execute(duration); Ok(()) } @@ -461,7 +461,7 @@ where let duration = Instant::now().duration_since(start); // Always record for slow-statement diagnostics - self.context.add_decrypt_duration(duration); + self.context.add_decrypt_duration_for_execute(duration); // Prometheus metrics remain gated if self.context.prometheus_enabled() { diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index 41aeb2b9..c6354c26 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -27,7 +27,10 @@ use sqltk::parser::ast::{Expr, Ident, ObjectName, ObjectNamePart, Set, Value, Va pub use statement_metadata::StatementMetadata; use std::{ collections::{HashMap, VecDeque}, - sync::{Arc, LazyLock, RwLock}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, LazyLock, RwLock, + }, time::{Duration, Instant}, }; use tokio::sync::oneshot; @@ -39,6 +42,9 @@ type ExecuteQueue = Queue; type SessionMetricsQueue = Queue; type PortalQueue = Queue>; +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct SessionId(u64); + #[derive(Clone, Debug, PartialEq)] pub struct KeysetIdentifier(pub IdentifiedBy); @@ -60,6 +66,7 @@ where reload_sender: ReloadSender, column_mapper: ColumnMapper, statements: Arc>>>, + statement_sessions: Arc>>, portals: Arc>>, describe: Arc>, execute: Arc>, @@ -68,37 +75,66 @@ where table_resolver: Arc, unsafe_disable_mapping: bool, keyset_id: Arc>>, + session_id_counter: Arc, } #[derive(Clone, Debug)] pub struct ExecuteContext { name: Name, start: Instant, + session_id: Option, + server_wait_duration: Option, + server_response_duration: Duration, } impl ExecuteContext { - fn new(name: Name) -> ExecuteContext { + fn new(name: Name, session_id: Option) -> ExecuteContext { ExecuteContext { name, start: Instant::now(), + session_id, + server_wait_duration: None, + server_response_duration: Duration::from_secs(0), } } fn duration(&self) -> Duration { Instant::now().duration_since(self.start) } + + fn record_server_wait_or_add_response(&mut self, duration: Duration) { + if self.server_wait_duration.is_none() { + self.server_wait_duration = Some(duration); + } else { + self.server_response_duration += duration; + } + } + + fn server_wait_duration(&self) -> Option { + self.server_wait_duration + } + + fn server_response_duration(&self) -> Duration { + self.server_response_duration + } + + fn session_id(&self) -> Option { + self.session_id + } } #[derive(Clone, Debug)] pub struct SessionMetricsContext { + id: SessionId, start: Instant, pub phase_timing: PhaseTiming, pub metadata: StatementMetadata, } impl SessionMetricsContext { - fn new() -> SessionMetricsContext { + fn new(id: SessionId) -> SessionMetricsContext { SessionMetricsContext { + id, start: Instant::now(), phase_timing: PhaseTiming::new(), metadata: StatementMetadata::new(), @@ -108,6 +144,10 @@ impl SessionMetricsContext { fn duration(&self) -> Duration { Instant::now().duration_since(self.start) } + + fn id(&self) -> SessionId { + self.id + } } #[derive(Clone, Debug)] @@ -131,6 +171,7 @@ where Context { statements: Arc::new(RwLock::new(HashMap::new())), + statement_sessions: Arc::new(RwLock::new(HashMap::new())), portals: Arc::new(RwLock::new(HashMap::new())), describe: Arc::new(RwLock::from(Queue::new())), execute: Arc::new(RwLock::from(Queue::new())), @@ -145,6 +186,7 @@ where reload_sender, unsafe_disable_mapping: false, keyset_id: Arc::new(RwLock::new(None)), + session_id_counter: Arc::new(AtomicU64::new(1)), } } @@ -161,9 +203,11 @@ where let _ = self.describe.write().map(|mut queue| queue.complete()); } - pub fn start_session(&mut self) { - let ctx = SessionMetricsContext::new(); + pub fn start_session(&mut self) -> SessionId { + let id = SessionId(self.session_id_counter.fetch_add(1, Ordering::Relaxed)); + let ctx = SessionMetricsContext::new(id); let _ = self.session_metrics.write().map(|mut queue| queue.add(ctx)); + id } pub fn finish_session(&mut self) { @@ -240,10 +284,10 @@ where .map(|mut queue| queue.complete()); } - pub fn set_execute(&mut self, name: Name) { + pub fn set_execute(&mut self, name: Name, session_id: Option) { debug!(target: CONTEXT, client_id = self.client_id, execute = ?name); - let ctx = ExecuteContext::new(name); + let ctx = ExecuteContext::new(name, session_id); let _ = self.execute.write().map(|mut queue| queue.add(ctx)); } @@ -263,6 +307,16 @@ where debug!(target: CONTEXT, client_id = self.client_id, msg = "Execute complete"); if let Some(execute) = self.get_execute() { + if let Some(session_id) = execute.session_id() { + if let Some(wait) = execute.server_wait_duration() { + self.record_server_wait_duration(session_id, wait); + } + let response = execute.server_response_duration(); + if !response.is_zero() { + self.add_server_response_duration(session_id, response); + } + } + // Get labels from current session metadata let (statement_type, protocol, mapped, multi_statement) = if let Some(session) = self.get_session_metrics() { @@ -316,6 +370,11 @@ where .statements .write() .map(|mut guarded| guarded.remove(name)); + + let _ = self + .statement_sessions + .write() + .map(|mut guarded| guarded.remove(name)); } pub fn add_portal(&mut self, name: Name, portal: Portal) { @@ -334,6 +393,18 @@ where statements.get(name).cloned() } + pub fn set_statement_session(&mut self, name: Name, session_id: SessionId) { + let _ = self + .statement_sessions + .write() + .map(|mut sessions| sessions.insert(name, session_id)); + } + + pub fn get_statement_session(&self, name: &Name) -> Option { + let sessions = self.statement_sessions.read().ok()?; + sessions.get(name).copied() + } + /// /// Close the portal identified by `name` /// Portal is removed from queue @@ -364,10 +435,17 @@ where match portal.as_ref() { Portal::Encrypted { statement, .. } => Some(statement.clone()), - Portal::Passthrough => None, + Portal::Passthrough { .. } => None, } } + pub fn get_portal_session_id(&self, name: &Name) -> Option { + let portals = self.portals.read().ok()?; + let queue = portals.get(name)?; + let portal = queue.next()?; + portal.session_id() + } + pub fn get_statement_for_row_decription(&self) -> Option> { if let Some(statement) = self.get_statement_from_describe() { return Some(statement.clone()); @@ -733,118 +811,128 @@ where &self.config } - /// Record parse phase duration for the current session (first write wins) - pub fn record_parse_duration(&mut self, duration: Duration) { + fn with_session_metrics_mut(&mut self, session_id: SessionId, f: F) + where + F: FnOnce(&mut SessionMetricsContext), + { if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - session.phase_timing.record_parse(duration); + if let Some(session) = queue + .queue + .iter_mut() + .find(|session| session.id() == session_id) + { + f(session); } } } - /// Add encrypt phase duration for the current session (accumulate) - pub fn add_encrypt_duration(&mut self, duration: Duration) { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - session.phase_timing.add_encrypt(duration); - } - } + pub fn latest_session_id(&self) -> Option { + let queue = self.session_metrics.read().ok()?; + queue.queue.back().map(|session| session.id()) + } + + /// Record parse phase duration for the session (first write wins) + pub fn record_parse_duration(&mut self, session_id: SessionId, duration: Duration) { + self.with_session_metrics_mut(session_id, |session| { + session.phase_timing.record_parse(duration); + }); + } + + /// Add encrypt phase duration for the session (accumulate) + pub fn add_encrypt_duration(&mut self, session_id: SessionId, duration: Duration) { + self.with_session_metrics_mut(session_id, |session| { + session.phase_timing.add_encrypt(duration); + }); } /// Record server write phase duration - pub fn record_server_write_duration(&mut self, duration: Duration) { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - session.phase_timing.record_server_write(duration); - } - } + pub fn record_server_write_duration(&mut self, session_id: SessionId, duration: Duration) { + self.with_session_metrics_mut(session_id, |session| { + session.phase_timing.record_server_write(duration); + }); } /// Add server write phase duration (accumulate) - pub fn add_server_write_duration(&mut self, duration: Duration) { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - session.phase_timing.add_server_write(duration); - } - } + pub fn add_server_write_duration(&mut self, session_id: SessionId, duration: Duration) { + self.with_session_metrics_mut(session_id, |session| { + session.phase_timing.add_server_write(duration); + }); } /// Record server wait phase duration (time to first response byte) - pub fn record_server_wait_duration(&mut self, duration: Duration) { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - session.phase_timing.record_server_wait(duration); - } - } + pub fn record_server_wait_duration(&mut self, session_id: SessionId, duration: Duration) { + self.with_session_metrics_mut(session_id, |session| { + session.phase_timing.record_server_wait(duration); + }); } /// Record server response phase duration - pub fn record_server_response_duration(&mut self, duration: Duration) { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - session.phase_timing.record_server_response(duration); - } - } + pub fn record_server_response_duration(&mut self, session_id: SessionId, duration: Duration) { + self.with_session_metrics_mut(session_id, |session| { + session.phase_timing.record_server_response(duration); + }); } /// Add server response phase duration (accumulate) - pub fn add_server_response_duration(&mut self, duration: Duration) { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - session.phase_timing.add_server_response(duration); - } - } + pub fn add_server_response_duration(&mut self, session_id: SessionId, duration: Duration) { + self.with_session_metrics_mut(session_id, |session| { + session.phase_timing.add_server_response(duration); + }); } /// Record client write phase duration - pub fn record_client_write_duration(&mut self, duration: Duration) { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - session.phase_timing.record_client_write(duration); - } - } + pub fn record_client_write_duration(&mut self, session_id: SessionId, duration: Duration) { + self.with_session_metrics_mut(session_id, |session| { + session.phase_timing.record_client_write(duration); + }); } /// Add client write phase duration (accumulate) - pub fn add_client_write_duration(&mut self, duration: Duration) { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - session.phase_timing.add_client_write(duration); - } - } + pub fn add_client_write_duration(&mut self, session_id: SessionId, duration: Duration) { + self.with_session_metrics_mut(session_id, |session| { + session.phase_timing.add_client_write(duration); + }); } /// Add decrypt phase duration (accumulate) - pub fn add_decrypt_duration(&mut self, duration: Duration) { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - session.phase_timing.add_decrypt(duration); - } - } + pub fn add_decrypt_duration(&mut self, session_id: SessionId, duration: Duration) { + self.with_session_metrics_mut(session_id, |session| { + session.phase_timing.add_decrypt(duration); + }); } - /// Update statement metadata for the current session - pub fn update_statement_metadata(&mut self, f: F) + /// Update statement metadata for a session + pub fn update_statement_metadata(&mut self, session_id: SessionId, f: F) where F: FnOnce(&mut StatementMetadata), { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - f(&mut session.metadata); + self.with_session_metrics_mut(session_id, |session| { + f(&mut session.metadata); + }); + } + + /// Record server wait for first response; otherwise accumulate response time for the current execute + pub fn record_execute_server_timing(&mut self, duration: Duration) { + if let Ok(mut queue) = self.execute.write() { + if let Some(execute) = queue.current_mut() { + execute.record_server_wait_or_add_response(duration); } } } - /// Record server wait for first response; otherwise accumulate response time - pub fn record_server_wait_or_add_response(&mut self, duration: Duration) { - if let Ok(mut queue) = self.session_metrics.write() { - if let Some(session) = queue.current_mut() { - if session.phase_timing.server_wait_duration.is_none() { - session.phase_timing.record_server_wait(duration); - } else { - session.phase_timing.add_server_response(duration); - } - } + /// Add decrypt phase duration for the current execute session (if any) + pub fn add_decrypt_duration_for_execute(&mut self, duration: Duration) { + let session_id = self.get_execute().and_then(|execute| execute.session_id()); + if let Some(session_id) = session_id { + self.add_decrypt_duration(session_id, duration); + } + } + + /// Add client write duration for the current execute session (if any) + pub fn add_client_write_duration_for_execute(&mut self, duration: Duration) { + let session_id = self.get_execute().and_then(|execute| execute.session_id()); + if let Some(session_id) = session_id { + self.add_client_write_duration(session_id, duration); } } } @@ -947,7 +1035,7 @@ mod tests { } fn portal(statement: &Arc) -> Portal { - Portal::encrypted_with_format_codes(statement.clone(), vec![]) + Portal::encrypted_with_format_codes(statement.clone(), vec![], None) } fn get_statement(portal: Arc) -> Arc { @@ -1001,7 +1089,7 @@ mod tests { context.add_portal(portal_name.clone(), portal(&statement)); // Add statement name to execute context - context.set_execute(portal_name.clone()); + context.set_execute(portal_name.clone(), None); // Portal statement should be the right statement let portal = context.get_portal_from_execute().unwrap(); @@ -1047,8 +1135,8 @@ mod tests { context.add_portal(portal_name.clone(), portal(&statement_2)); // Execute both portals - context.set_execute(portal_name.clone()); - context.set_execute(portal_name.clone()); + context.set_execute(portal_name.clone(), None); + context.set_execute(portal_name.clone(), None); // Portal should point to first statement let portal = context.get_portal_from_execute().unwrap(); @@ -1100,9 +1188,9 @@ mod tests { context.add_portal(portal_name_3.clone(), portal(&statement_3)); // Add portals to execute context - context.set_execute(portal_name_1.clone()); - context.set_execute(portal_name_2.clone()); - context.set_execute(portal_name_3.clone()); + context.set_execute(portal_name_1.clone(), None); + context.set_execute(portal_name_2.clone(), None); + context.set_execute(portal_name_3.clone(), None); // Multiple calls return the portal for the first Execution context let portal = context.get_portal_from_execute().unwrap(); diff --git a/packages/cipherstash-proxy/src/postgresql/context/portal.rs b/packages/cipherstash-proxy/src/postgresql/context/portal.rs index c6cc6585..ad6b033b 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/portal.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/portal.rs @@ -1,4 +1,4 @@ -use super::{super::format_code::FormatCode, Column}; +use super::{super::format_code::FormatCode, Column, SessionId}; use crate::postgresql::context::statement::Statement; use std::sync::Arc; @@ -7,38 +7,44 @@ pub enum Portal { Encrypted { format_codes: Vec, statement: Arc, + session_id: Option, + }, + Passthrough { + session_id: Option, }, - Passthrough, } impl Portal { pub fn encrypted_with_format_codes( statement: Arc, format_codes: Vec, + session_id: Option, ) -> Portal { Portal::Encrypted { statement, format_codes, + session_id, } } - pub fn encrypted(statement: Arc) -> Portal { + pub fn encrypted(statement: Arc, session_id: Option) -> Portal { let format_codes = vec![]; Portal::Encrypted { statement, format_codes, + session_id, } } - pub fn passthrough() -> Portal { - Portal::Passthrough + pub fn passthrough(session_id: Option) -> Portal { + Portal::Passthrough { session_id } } pub fn projection_columns(&self) -> &Vec> { static EMPTY: Vec> = vec![]; match self { Portal::Encrypted { statement, .. } => &statement.projection_columns, - _ => &EMPTY, + Portal::Passthrough { .. } => &EMPTY, } } @@ -60,9 +66,16 @@ impl Portal { } _ => format_codes.clone(), }, - Portal::Passthrough => { + Portal::Passthrough { .. } => { unreachable!() } } } + + pub fn session_id(&self) -> Option { + match self { + Portal::Encrypted { session_id, .. } => *session_id, + Portal::Passthrough { session_id } => *session_id, + } + } } diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index 55fdeba3..72d9c814 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -1,5 +1,5 @@ use super::context::phase_timing::PhaseTimer; -use super::context::{Context, Statement}; +use super::context::{Context, SessionId, Statement}; use super::error_handler::PostgreSqlErrorHandler; use super::messages::bind::Bind; use super::messages::describe::Describe; @@ -305,7 +305,9 @@ where let start = Instant::now(); self.server_writer.write_all(&bytes).await?; let duration = start.elapsed(); - self.context.add_server_write_duration(duration); + if let Some(session_id) = self.context.latest_session_id() { + self.context.add_server_write_duration(session_id, duration); + } Ok(()) } @@ -340,7 +342,9 @@ where async fn execute_handler(&mut self, bytes: &BytesMut) -> Result<(), Error> { let execute = Execute::try_from(bytes)?; debug!(target: PROTOCOL, client_id = self.context.client_id, ?execute); - self.context.set_execute(execute.portal.to_owned()); + let session_id = self.context.get_portal_session_id(&execute.portal); + self.context + .set_execute(execute.portal.to_owned(), session_id); Ok(()) } @@ -377,10 +381,10 @@ where /// - `Ok(None)` - No transformation needed, forward original query /// - `Err(error)` - Processing failed, error should be sent to client async fn query_handler(&mut self, bytes: &BytesMut) -> Result, Error> { - self.context.start_session(); + let session_id = self.context.start_session(); // Set protocol type for diagnostics - self.context.update_statement_metadata(|m| { + self.context.update_statement_metadata(session_id, |m| { m.protocol = Some(ProtocolType::Simple); }); @@ -397,7 +401,7 @@ where statements = parsed_statements.len(), ); - let mut portal = Portal::passthrough(); + let mut portal = Portal::passthrough(Some(session_id)); let mut encrypted = false; let mut parse_duration_recorded = false; @@ -447,12 +451,17 @@ where if typed_statement.requires_transform() { // Record parse duration before encryption work starts if !parse_duration_recorded { - self.context.record_parse_duration(parse_timer.elapsed()); + self.context + .record_parse_duration(session_id, parse_timer.elapsed()); parse_duration_recorded = true; } let encrypted_literals = self - .encrypt_literals(&typed_statement, &statement.literal_columns) + .encrypt_literals( + session_id, + &typed_statement, + &statement.literal_columns, + ) .await?; if let Some(transformed_statement) = self @@ -471,8 +480,11 @@ where counter!(STATEMENTS_ENCRYPTED_TOTAL).increment(1); - // Set Encrypted portal - portal = Portal::encrypted(Arc::new(statement)); + // Set Encrypted portal and mark as mapped + portal = Portal::encrypted(Arc::new(statement), Some(session_id)); + self.context.update_statement_metadata(session_id, |m| { + m.encrypted = true; + }); } None => { debug!(target: MAPPER, @@ -487,30 +499,32 @@ where // Record parse/typecheck duration (if not already recorded before encryption) if !parse_duration_recorded { - self.context.record_parse_duration(parse_timer.elapsed()); + self.context + .record_parse_duration(session_id, parse_timer.elapsed()); } // Set statement type based on parsed statements let statement_type = if parsed_statements.len() == 1 { parsed_statements .first() - .map(|stmt| StatementType::from_sql(&stmt.to_string())) + .map(StatementType::from_statement) .unwrap_or(StatementType::Other) } else { StatementType::Other }; - self.context.update_statement_metadata(|m| { + self.context.update_statement_metadata(session_id, |m| { m.statement_type = Some(statement_type); m.set_multi_statement(parsed_statements.len() > 1); }); // Set query fingerprint - self.context.update_statement_metadata(|m| { + self.context.update_statement_metadata(session_id, |m| { m.set_query_fingerprint(&query.statement); }); self.context.add_portal(Name::unnamed(), portal); - self.context.set_execute(Name::unnamed()); + self.context + .set_execute(Name::unnamed(), Some(session_id)); if encrypted { let transformed_statement = transformed_statements @@ -559,6 +573,7 @@ where /// literals that don't require encryption and `Some(EqlEncrypted)` for encrypted values. async fn encrypt_literals( &mut self, + session_id: SessionId, typed_statement: &TypeCheckedStatement<'_>, literal_columns: &Vec>, ) -> Result>, Error> { @@ -592,11 +607,11 @@ where let duration = Instant::now().duration_since(start); // Add to phase timing diagnostics (accumulate) - self.context.add_encrypt_duration(duration); + self.context.add_encrypt_duration(session_id, duration); // Update metadata with encrypted values count let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count(); - self.context.update_statement_metadata(|m| { + self.context.update_statement_metadata(session_id, |m| { m.encrypted = true; m.set_encrypted_values_count(encrypted_count); }); @@ -692,16 +707,18 @@ where /// - `Ok(None)` - No transformation needed, forward original message /// - `Err(error)` - Processing failed, error should be sent to client async fn parse_handler(&mut self, bytes: &BytesMut) -> Result, Error> { - self.context.start_session(); + let session_id = self.context.start_session(); // Set protocol type - self.context.update_statement_metadata(|m| { + self.context.update_statement_metadata(session_id, |m| { m.protocol = Some(ProtocolType::Extended); }); let parse_timer = PhaseTimer::start(); let mut message = Parse::try_from(bytes)?; + self.context + .set_statement_session(message.name.to_owned(), session_id); debug!( target: PROTOCOL, @@ -755,12 +772,17 @@ where Some(statement) => { if typed_statement.requires_transform() { // Record parse duration before encryption work starts - self.context.record_parse_duration(parse_timer.elapsed()); + self.context + .record_parse_duration(session_id, parse_timer.elapsed()); parse_duration_recorded = true; - let encrypted_literals = self - .encrypt_literals(&typed_statement, &statement.literal_columns) - .await?; + let encrypted_literals = self + .encrypt_literals( + session_id, + &typed_statement, + &statement.literal_columns, + ) + .await?; if let Some(transformed_statement) = self .transform_statement(&typed_statement, &encrypted_literals) @@ -792,15 +814,17 @@ where // Record parse duration (if not already recorded before encryption) if !parse_duration_recorded { - self.context.record_parse_duration(parse_timer.elapsed()); + self.context + .record_parse_duration(session_id, parse_timer.elapsed()); } // Set statement type and fingerprint - self.context.update_statement_metadata(|m| { - m.statement_type = Some(StatementType::from_sql(&message.statement)); + self.context.update_statement_metadata(session_id, |m| { + m.statement_type = Some(StatementType::from_statement(&statement)); m.set_query_fingerprint(&message.statement); }); + if message.requires_rewrite() { let bytes = BytesMut::try_from(message)?; @@ -950,28 +974,39 @@ where let mut bind = Bind::try_from(bytes)?; + let session_id = self + .context + .get_statement_session(&bind.prepared_statement) + .or_else(|| self.context.latest_session_id()); + // Track param bytes for diagnostics let param_bytes: usize = bind.param_values.iter().map(|p| p.bytes.len()).sum(); - self.context.update_statement_metadata(|m| { - m.set_param_bytes(param_bytes); - }); + if let Some(session_id) = session_id { + self.context + .update_statement_metadata(session_id, |m| m.set_param_bytes(param_bytes)); + } debug!(target: PROTOCOL, client_id = self.context.client_id, bind = ?bind); - let mut portal = Portal::passthrough(); + let mut portal = Portal::passthrough(session_id); if let Some(statement) = self.context.get_statement(&bind.prepared_statement) { debug!(target:MAPPER, client_id = self.context.client_id, ?statement); if statement.has_params() { - let encrypted = self.encrypt_params(&bind, &statement).await?; + let encrypted = self.encrypt_params(session_id, &bind, &statement).await?; bind.rewrite(encrypted)?; } if statement.has_projection() { portal = Portal::encrypted_with_format_codes( statement, bind.result_columns_format_codes.to_owned(), + session_id, ); + if let Some(session_id) = session_id { + self.context + .update_statement_metadata(session_id, |m| m.encrypted = true); + } } }; @@ -1002,6 +1037,7 @@ where /// async fn encrypt_params( &mut self, + session_id: Option, bind: &Bind, statement: &Statement, ) -> Result>, Error> { @@ -1023,14 +1059,18 @@ where let duration = Instant::now().duration_since(start); // Add to phase timing diagnostics (accumulate) - self.context.add_encrypt_duration(duration); + if let Some(session_id) = session_id { + self.context.add_encrypt_duration(session_id, duration); + } // Always update metadata for slow-statement logging let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count(); - self.context.update_statement_metadata(|m| { - m.encrypted = true; - m.set_encrypted_values_count(encrypted_count); - }); + if let Some(session_id) = session_id { + self.context.update_statement_metadata(session_id, |m| { + m.encrypted = true; + m.set_encrypted_values_count(encrypted_count); + }); + } // Prometheus metrics remain gated if self.context.prometheus_enabled() { From 7e5007a178e2c5a747607a057d72591e198f65b9 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 18:58:11 +1100 Subject: [PATCH 25/31] Improve diagnostics metadata and integration metrics fetch --- .../src/common.rs | 1 + .../src/diagnostics.rs | 40 ++++++++++---- packages/cipherstash-proxy/src/config/log.rs | 7 ++- .../postgresql/context/statement_metadata.rs | 53 +++++++++++-------- 4 files changed, 66 insertions(+), 35 deletions(-) diff --git a/packages/cipherstash-proxy-integration/src/common.rs b/packages/cipherstash-proxy-integration/src/common.rs index c4e085ad..79704876 100644 --- a/packages/cipherstash-proxy-integration/src/common.rs +++ b/packages/cipherstash-proxy-integration/src/common.rs @@ -12,6 +12,7 @@ use tracing::info; use tracing_subscriber::{filter::Directive, EnvFilter, FmtSubscriber}; pub const PROXY: u16 = 6432; +pub const PROXY_METRICS_PORT: u16 = 9930; pub const PG_PORT: u16 = 5532; pub const PG_TLS_PORT: u16 = 5617; diff --git a/packages/cipherstash-proxy-integration/src/diagnostics.rs b/packages/cipherstash-proxy-integration/src/diagnostics.rs index 2cab7a22..83995fe1 100644 --- a/packages/cipherstash-proxy-integration/src/diagnostics.rs +++ b/packages/cipherstash-proxy-integration/src/diagnostics.rs @@ -1,6 +1,32 @@ #[cfg(test)] mod tests { - use crate::common::{clear, connect_with_tls, PROXY}; + use crate::common::{clear, connect_with_tls, PROXY, PROXY_METRICS_PORT}; + + /// Fetch metrics with retry logic to handle CI timing variability. + async fn fetch_metrics_with_retry(max_retries: u32, delay_ms: u64) -> String { + let url = format!("http://localhost:{}/metrics", PROXY_METRICS_PORT); + let mut last_error = None; + + for attempt in 0..max_retries { + if attempt > 0 { + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + } + + match reqwest::get(&url).await { + Ok(response) => match response.text().await { + Ok(body) => return body, + Err(e) => last_error = Some(format!("Failed to read response: {}", e)), + }, + Err(e) => last_error = Some(format!("Failed to fetch metrics: {}", e)), + } + } + + panic!( + "Failed to fetch metrics after {} retries: {}", + max_retries, + last_error.unwrap_or_else(|| "unknown error".to_string()) + ); + } #[tokio::test] async fn metrics_include_statement_labels() { @@ -23,16 +49,8 @@ mod tests { .await .unwrap(); - // Give the metrics some time to be written - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - // Fetch metrics from the /metrics endpoint - let body = reqwest::get("http://localhost:9930/metrics") - .await - .unwrap() - .text() - .await - .unwrap(); + // Fetch metrics with retry logic for CI robustness + let body = fetch_metrics_with_retry(5, 200).await; // Assert that the metrics include the expected labels assert!( diff --git a/packages/cipherstash-proxy/src/config/log.rs b/packages/cipherstash-proxy/src/config/log.rs index ce1e2fb2..6f5d10ac 100644 --- a/packages/cipherstash-proxy/src/config/log.rs +++ b/packages/cipherstash-proxy/src/config/log.rs @@ -125,8 +125,13 @@ impl LogConfig { LogLevel::Info } + /// Default threshold for slow statement logging (2 seconds). + /// + /// This value represents a reasonable baseline for identifying slow queries in most + /// PostgreSQL workloads. Queries exceeding this duration are likely candidates for + /// optimization. Operators can adjust via CS_LOG__SLOW_STATEMENT_MIN_DURATION_MS. pub const fn default_slow_statement_min_duration_ms() -> u64 { - 2000 // 2 seconds + 2000 } } diff --git a/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs b/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs index 25eeacaf..a4735016 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/statement_metadata.rs @@ -1,4 +1,5 @@ use serde::Serialize; +use sqltk::parser::ast::Statement; /// Statement type classification for metrics labels #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] @@ -12,19 +13,14 @@ pub enum StatementType { } impl StatementType { - /// Create from SQL statement string - pub fn from_sql(sql: &str) -> Self { - let trimmed = sql.trim_start().to_uppercase(); - if trimmed.starts_with("INSERT") { - StatementType::Insert - } else if trimmed.starts_with("UPDATE") { - StatementType::Update - } else if trimmed.starts_with("DELETE") { - StatementType::Delete - } else if trimmed.starts_with("SELECT") { - StatementType::Select - } else { - StatementType::Other + /// Create from parsed AST statement + pub fn from_statement(stmt: &Statement) -> Self { + match stmt { + Statement::Insert(_) => StatementType::Insert, + Statement::Update { .. } => StatementType::Update, + Statement::Delete(_) => StatementType::Delete, + Statement::Query(_) => StatementType::Select, + _ => StatementType::Other, } } @@ -104,6 +100,11 @@ impl StatementMetadata { self.param_bytes = bytes; } + /// Set query fingerprint from SQL statement. + /// + /// Fingerprints are session-local identifiers for correlating log entries within a single + /// proxy instance. They are NOT stable across Rust versions or deployments and should not + /// be used for cross-session correlation or persistent storage. pub fn set_query_fingerprint(&mut self, sql: &str) { use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; @@ -122,31 +123,37 @@ impl StatementMetadata { #[cfg(test)] mod tests { use super::*; + use sqltk::parser::dialect::PostgreSqlDialect; + use sqltk::parser::parser::Parser; + + fn parse(sql: &str) -> Statement { + Parser::new(&PostgreSqlDialect {}) + .try_with_sql(sql) + .unwrap() + .parse_statement() + .unwrap() + } #[test] - fn statement_type_from_sql() { - assert_eq!( - StatementType::from_sql("INSERT INTO foo VALUES (1)"), - StatementType::Insert - ); + fn statement_type_from_statement() { assert_eq!( - StatementType::from_sql(" insert into foo"), + StatementType::from_statement(&parse("INSERT INTO foo VALUES (1)")), StatementType::Insert ); assert_eq!( - StatementType::from_sql("UPDATE foo SET bar = 1"), + StatementType::from_statement(&parse("UPDATE foo SET bar = 1")), StatementType::Update ); assert_eq!( - StatementType::from_sql("DELETE FROM foo"), + StatementType::from_statement(&parse("DELETE FROM foo")), StatementType::Delete ); assert_eq!( - StatementType::from_sql("SELECT * FROM foo"), + StatementType::from_statement(&parse("SELECT * FROM foo")), StatementType::Select ); assert_eq!( - StatementType::from_sql("CREATE TABLE foo"), + StatementType::from_statement(&parse("CREATE TABLE foo (id INT)")), StatementType::Other ); } From 6959c1f2a68508cca9c9155d847b52bbac14412c Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 19:06:43 +1100 Subject: [PATCH 26/31] Format diagnostics frontend changes --- .../cipherstash-proxy/src/postgresql/frontend.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index 72d9c814..53005269 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -523,8 +523,7 @@ where }); self.context.add_portal(Name::unnamed(), portal); - self.context - .set_execute(Name::unnamed(), Some(session_id)); + self.context.set_execute(Name::unnamed(), Some(session_id)); if encrypted { let transformed_statement = transformed_statements @@ -776,13 +775,9 @@ where .record_parse_duration(session_id, parse_timer.elapsed()); parse_duration_recorded = true; - let encrypted_literals = self - .encrypt_literals( - session_id, - &typed_statement, - &statement.literal_columns, - ) - .await?; + let encrypted_literals = self + .encrypt_literals(session_id, &typed_statement, &statement.literal_columns) + .await?; if let Some(transformed_statement) = self .transform_statement(&typed_statement, &encrypted_literals) @@ -824,7 +819,6 @@ where m.set_query_fingerprint(&message.statement); }); - if message.requires_rewrite() { let bytes = BytesMut::try_from(message)?; From 321701f4dafb1a1f9ec75e48ba8a08a0fdb9123e Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 20:05:24 +1100 Subject: [PATCH 27/31] test(integration): expose prometheus port for proxy-tls Adds port 9930 to the proxy-tls service in docker-compose.yml to enable metrics collection during TLS-enabled integration tests. --- tests/docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index aecc812f..a56cc303 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -92,6 +92,7 @@ services: container_name: proxy-tls ports: - 6432:6432 + - 9930:9930 environment: - CS_DATABASE__NAME=${CS_DATABASE__NAME} - CS_DATABASE__USERNAME=${CS_DATABASE__USERNAME} From 9f07ad11062aec19f4f8177419401ad68a3d79fc Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 21:57:22 +1100 Subject: [PATCH 28/31] docs(context): document two-phase timing pattern in ExecuteContext Add doc comments explaining why timing data is accumulated in ExecuteContext during backend message processing and transferred to SessionMetricsContext on completion. This pattern exists because the backend operates on the execute queue without direct access to the session metrics queue. --- .../src/postgresql/context/mod.rs | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index c6354c26..4ad5f984 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -78,12 +78,22 @@ where session_id_counter: Arc, } +/// Context for tracking an in-flight Execute operation. +/// +/// Timing data is accumulated here during backend message processing because +/// the backend operates on the execute queue rather than having direct access +/// to the session metrics queue. On completion via `complete_execution()`, +/// timing is transferred to the associated SessionMetricsContext. #[derive(Clone, Debug)] pub struct ExecuteContext { name: Name, start: Instant, session_id: Option, + /// Server wait duration (time to first response byte). + /// Accumulated here during execution, transferred to SessionMetricsContext on completion. server_wait_duration: Option, + /// Server response duration (time spent receiving response data after first byte). + /// Accumulated here during execution, transferred to SessionMetricsContext on completion. server_response_duration: Duration, } @@ -292,17 +302,23 @@ where let _ = self.execute.write().map(|mut queue| queue.add(ctx)); } + /// Marks the current Execution as Complete. /// - /// Marks the current Execution as Complete + /// Transfers accumulated timing data from ExecuteContext to SessionMetricsContext.phase_timing: + /// - `server_wait_duration` (time to first response byte) is recorded to the session + /// - `server_response_duration` (time receiving response data) is added to the session /// - /// If the associated portal is Unnamed, it is closed + /// This two-phase timing pattern exists because the backend operates on the execute queue + /// rather than having direct access to the session. Timing is accumulated in ExecuteContext + /// during message processing, then transferred to the correct SessionMetricsContext here. + /// + /// If the associated portal is Unnamed, it is closed. /// /// From the PostgreSQL Extended Query docs: /// If successfully created, a named portal object lasts till the end of the current transaction, unless explicitly destroyed. /// An unnamed portal is destroyed at the end of the transaction, or as soon as the next Bind statement specifying the unnamed portal as destination is issued /// /// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY - /// pub fn complete_execution(&mut self) { debug!(target: CONTEXT, client_id = self.client_id, msg = "Execute complete"); From 78084625e02e4d536521f8ce291a162c2a0f9041 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 21:57:32 +1100 Subject: [PATCH 29/31] refactor(diagnostics): extract retry configuration to named constants Replace magic numbers in fetch_metrics_with_retry with documented constants: - METRICS_FETCH_MAX_RETRIES (5) - ~1 second total wait with 200ms delay - METRICS_FETCH_RETRY_DELAY_MS (200) - balance responsiveness with scrape time --- .../src/diagnostics.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/packages/cipherstash-proxy-integration/src/diagnostics.rs b/packages/cipherstash-proxy-integration/src/diagnostics.rs index 83995fe1..423dbdc9 100644 --- a/packages/cipherstash-proxy-integration/src/diagnostics.rs +++ b/packages/cipherstash-proxy-integration/src/diagnostics.rs @@ -2,6 +2,16 @@ mod tests { use crate::common::{clear, connect_with_tls, PROXY, PROXY_METRICS_PORT}; + /// Maximum number of retry attempts for fetching metrics. + /// 5 retries with 200ms delay gives ~1 second total wait time, + /// sufficient for Prometheus scrape interval in CI environments. + const METRICS_FETCH_MAX_RETRIES: u32 = 5; + + /// Delay between retry attempts in milliseconds. + /// 200ms provides a reasonable balance between responsiveness and allowing + /// sufficient time for metrics to be published by the Prometheus client. + const METRICS_FETCH_RETRY_DELAY_MS: u64 = 200; + /// Fetch metrics with retry logic to handle CI timing variability. async fn fetch_metrics_with_retry(max_retries: u32, delay_ms: u64) -> String { let url = format!("http://localhost:{}/metrics", PROXY_METRICS_PORT); @@ -50,7 +60,8 @@ mod tests { .unwrap(); // Fetch metrics with retry logic for CI robustness - let body = fetch_metrics_with_retry(5, 200).await; + let body = + fetch_metrics_with_retry(METRICS_FETCH_MAX_RETRIES, METRICS_FETCH_RETRY_DELAY_MS).await; // Assert that the metrics include the expected labels assert!( From 061bd297f947728b84d0b01c81b3c64ea2a658f0 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 21:57:42 +1100 Subject: [PATCH 30/31] refactor(frontend): improve session ID handling and reduce duplication Address code review suggestions: - Add warning log when bind_handler falls back to latest_session_id() - Thread session_id explicitly through write_to_server signature - Combine duplicate Option guards in encrypt_params --- .../src/postgresql/frontend.rs | 51 ++++++++++++------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index 53005269..e75b4bf5 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -11,7 +11,7 @@ use super::parser::SqlParser; use super::protocol::{self}; use crate::connect::Sender; use crate::error::{EncryptError, Error, MappingError}; -use crate::log::{MAPPER, PROTOCOL}; +use crate::log::{CONTEXT, MAPPER, PROTOCOL}; use crate::postgresql::context::column::Column; use crate::postgresql::context::statement_metadata::{ProtocolType, StatementType}; use crate::postgresql::context::Portal; @@ -170,7 +170,7 @@ where counter!(CLIENTS_BYTES_RECEIVED_TOTAL).increment(sent); if self.context.mapping_disabled() { - self.write_to_server(bytes).await?; + self.write_to_server(bytes, None).await?; return Ok(()); } @@ -293,11 +293,15 @@ where } } - self.write_to_server(bytes).await?; + self.write_to_server(bytes, None).await?; Ok(()) } - pub async fn write_to_server(&mut self, bytes: BytesMut) -> Result<(), Error> { + pub async fn write_to_server( + &mut self, + bytes: BytesMut, + session_id: Option, + ) -> Result<(), Error> { debug!(target: PROTOCOL, msg = "Write to server", ?bytes); let sent: u64 = bytes.len() as u64; counter!(SERVER_BYTES_SENT_TOTAL).increment(sent); @@ -305,7 +309,8 @@ where let start = Instant::now(); self.server_writer.write_all(&bytes).await?; let duration = start.elapsed(); - if let Some(session_id) = self.context.latest_session_id() { + let session_to_record = session_id.or_else(|| self.context.latest_session_id()); + if let Some(session_id) = session_to_record { self.context.add_server_write_duration(session_id, duration); } @@ -315,7 +320,7 @@ where pub async fn terminate(&mut self) -> Result<(), Error> { debug!(target: PROTOCOL, msg = "Terminate server connection"); let bytes = Terminate::message(); - self.write_to_server(bytes).await?; + self.write_to_server(bytes, None).await?; Ok(()) } @@ -968,10 +973,21 @@ where let mut bind = Bind::try_from(bytes)?; - let session_id = self - .context - .get_statement_session(&bind.prepared_statement) - .or_else(|| self.context.latest_session_id()); + let session_id = match self.context.get_statement_session(&bind.prepared_statement) { + Some(id) => Some(id), + None => { + let fallback = self.context.latest_session_id(); + if fallback.is_some() { + warn!( + target: CONTEXT, + client_id = self.context.client_id, + prepared_statement = ?bind.prepared_statement, + msg = "Session lookup failed for prepared statement, using latest session" + ); + } + fallback + } + }; // Track param bytes for diagnostics let param_bytes: usize = bind.param_values.iter().map(|p| p.bytes.len()).sum(); @@ -1052,15 +1068,14 @@ where let duration = Instant::now().duration_since(start); - // Add to phase timing diagnostics (accumulate) - if let Some(session_id) = session_id { - self.context.add_encrypt_duration(session_id, duration); - } - - // Always update metadata for slow-statement logging + // Record timing and metadata for this encryption operation let encrypted_count = encrypted.iter().filter(|e| e.is_some()).count(); - if let Some(session_id) = session_id { - self.context.update_statement_metadata(session_id, |m| { + if let Some(sid) = session_id { + // Add to phase timing diagnostics (accumulate) + self.context.add_encrypt_duration(sid, duration); + + // Always update metadata for slow-statement logging + self.context.update_statement_metadata(sid, |m| { m.encrypted = true; m.set_encrypted_values_count(encrypted_count); }); From e6bf1deda8228ff954a56f80a86399f04b23e238 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 22 Jan 2026 22:50:22 +1100 Subject: [PATCH 31/31] refactor(frontend): remove unused session_id parameter from write_to_server All callers passed None, making the parameter obsolete. Simplified to always use latest_session_id() directly. --- .../cipherstash-proxy/src/postgresql/frontend.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index e75b4bf5..ed5a86ea 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -170,7 +170,7 @@ where counter!(CLIENTS_BYTES_RECEIVED_TOTAL).increment(sent); if self.context.mapping_disabled() { - self.write_to_server(bytes, None).await?; + self.write_to_server(bytes).await?; return Ok(()); } @@ -293,15 +293,11 @@ where } } - self.write_to_server(bytes, None).await?; + self.write_to_server(bytes).await?; Ok(()) } - pub async fn write_to_server( - &mut self, - bytes: BytesMut, - session_id: Option, - ) -> Result<(), Error> { + pub async fn write_to_server(&mut self, bytes: BytesMut) -> Result<(), Error> { debug!(target: PROTOCOL, msg = "Write to server", ?bytes); let sent: u64 = bytes.len() as u64; counter!(SERVER_BYTES_SENT_TOTAL).increment(sent); @@ -309,8 +305,7 @@ where let start = Instant::now(); self.server_writer.write_all(&bytes).await?; let duration = start.elapsed(); - let session_to_record = session_id.or_else(|| self.context.latest_session_id()); - if let Some(session_id) = session_to_record { + if let Some(session_id) = self.context.latest_session_id() { self.context.add_server_write_duration(session_id, duration); } @@ -320,7 +315,7 @@ where pub async fn terminate(&mut self) -> Result<(), Error> { debug!(target: PROTOCOL, msg = "Terminate server connection"); let bytes = Terminate::message(); - self.write_to_server(bytes, None).await?; + self.write_to_server(bytes).await?; Ok(()) }