From c592fe8901c66c842f8d6a3c57cbcaeee8a4523d Mon Sep 17 00:00:00 2001 From: Yang Keao Date: Thu, 4 Dec 2025 18:53:14 +0800 Subject: [PATCH] implement a basic audit log analyzer Signed-off-by: Yang Keao --- cmd/auditloganalyzer/main.go | 143 ++++++++++++++ pkg/sqlreplay/cmd/audit_log_analyzer.go | 215 +++++++++++++++++++++ pkg/sqlreplay/cmd/audit_log_plugin.go | 38 ++-- pkg/sqlreplay/replay/audit_log_analyzer.go | 97 ++++++++++ 4 files changed, 480 insertions(+), 13 deletions(-) create mode 100644 cmd/auditloganalyzer/main.go create mode 100644 pkg/sqlreplay/cmd/audit_log_analyzer.go create mode 100644 pkg/sqlreplay/replay/audit_log_analyzer.go diff --git a/cmd/auditloganalyzer/main.go b/cmd/auditloganalyzer/main.go new file mode 100644 index 00000000..609f9661 --- /dev/null +++ b/cmd/auditloganalyzer/main.go @@ -0,0 +1,143 @@ +// Copyright 2025 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "database/sql" + "encoding/csv" + "fmt" + "os" + "time" + + "github.com/pingcap/tiproxy/lib/config" + "github.com/pingcap/tiproxy/lib/util/cmd" + lg "github.com/pingcap/tiproxy/lib/util/logger" + replaycmd "github.com/pingcap/tiproxy/pkg/sqlreplay/cmd" + "github.com/pingcap/tiproxy/pkg/sqlreplay/replay" + "github.com/pingcap/tiproxy/pkg/util/versioninfo" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +const ( + formatCSV = "csv" + formatMySQL = "mysql" +) + +func main() { + rootCmd := &cobra.Command{ + Use: os.Args[0], + Short: "start the analyzer", + Version: fmt.Sprintf("%s, commit %s", versioninfo.TiProxyVersion, versioninfo.TiProxyGitHash), + } + rootCmd.SetOut(os.Stdout) + rootCmd.SetErr(os.Stderr) + + input := rootCmd.PersistentFlags().String("input", "", "directory for traffic files") + startTime := rootCmd.PersistentFlags().Time("start-time", time.Time{}, []string{time.RFC3339, time.RFC3339Nano}, "the start time to analyze the audit log.") + endTime := rootCmd.PersistentFlags().Time("end-time", time.Time{}, []string{time.RFC3339, time.RFC3339Nano}, "the end time to analyze the audit log.") + output := rootCmd.PersistentFlags().String("output", "audit_log_analysis_result.csv", "the output path for analysis result.") + db := rootCmd.PersistentFlags().String("db", "", "the target database to analyze. Empty means all databases will be recorded.") + filterCommandWithRetry := rootCmd.PersistentFlags().Bool("filter-command-with-retry", false, "filter out commands that are retries according to the audit log.") + outputFormat := rootCmd.PersistentFlags().String("output-format", "csv", "the output format for analysis result. Currently only 'csv' and 'mysql' is supported.") + outputTableName := rootCmd.PersistentFlags().String("output-table-name", "audit_log_analysis", "the output table name when output format is 'mysql'.") + + rootCmd.RunE = func(cmd *cobra.Command, _ []string) error { + logger, _, _, err := lg.BuildLogger(&config.Log{ + Encoder: "tidb", + LogOnline: config.LogOnline{ + Level: "info", + }, + }) + if err != nil { + return err + } + + result, err := replay.Analyze(logger, replaycmd.AnalyzeConfig{ + Input: *input, + Start: *startTime, + End: *endTime, + DB: *db, + FilterCommandWithRetry: *filterCommandWithRetry, + }) + if err != nil { + return err + } + + switch *outputFormat { + case formatCSV: + logger.Info("writing analysis result to CSV", zap.String("output", *output)) + return writeAnalyzeResultToCSV(result, *output) + case formatMySQL: + logger.Info("writing analysis result to MySQL", zap.String("output", *output), zap.String("table", *outputTableName)) + return writeAnalyzeResultToMySQL(result, *output, *outputTableName) + default: + return fmt.Errorf("unsupported output format: %s", *outputFormat) + } + } + + cmd.RunRootCommand(rootCmd) +} + +func writeAnalyzeResultToCSV(result replaycmd.AuditLogAnalyzeResult, outputPath string) error { + f, err := os.Create(outputPath) + if err != nil { + return err + } + defer f.Close() + w := csv.NewWriter(f) + for sql, group := range result { + record := []string{ + sql, + fmt.Sprintf("%d", group.ExecutionCount), + fmt.Sprintf("%d", group.TotalCostTime.Microseconds()), + fmt.Sprintf("%d", group.TotalAffectedRows), + group.StmtTypes.String(), + } + if err := w.Write(record); err != nil { + return err + } + } + w.Flush() + if err := w.Error(); err != nil { + return err + } + return nil +} + +func writeAnalyzeResultToMySQL(result replaycmd.AuditLogAnalyzeResult, outputPath string, outputTableName string) error { + db, err := sql.Open("mysql", outputPath) + if err != nil { + return err + } + defer db.Close() + + createTableSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( + sql_text TEXT, + execution_count INT, + total_cost_time BIGINT, + total_affected_rows BIGINT, + statement_types TEXT + );`, outputTableName) + _, err = db.Exec(createTableSQL) + if err != nil { + return err + } + + insertSQL := fmt.Sprintf(`INSERT INTO %s (sql_text, execution_count, total_cost_time, total_affected_rows, statement_types) VALUES (?, ?, ?, ?, ?)`, outputTableName) + stmt, err := db.Prepare(insertSQL) + if err != nil { + return err + } + defer stmt.Close() + + for sqlText, group := range result { + _, err = stmt.Exec(sqlText, group.ExecutionCount, group.TotalCostTime.Microseconds(), group.TotalAffectedRows, group.StmtTypes.String()) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/sqlreplay/cmd/audit_log_analyzer.go b/pkg/sqlreplay/cmd/audit_log_analyzer.go new file mode 100644 index 00000000..fe5c9f10 --- /dev/null +++ b/pkg/sqlreplay/cmd/audit_log_analyzer.go @@ -0,0 +1,215 @@ +// Copyright 2025 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package cmd + +import ( + "strconv" + "strings" + "time" + + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tiproxy/lib/util/errors" + pnet "github.com/pingcap/tiproxy/pkg/proxy/net" + "github.com/siddontang/go/hack" +) + +type stmtTypesSet map[string]struct{} + +func (s stmtTypesSet) String() string { + var types []string + for stmtType := range s { + types = append(types, stmtType) + } + return strings.Join(types, ",") +} + +// AuditLogGroup is the analysis result for a group of similar audit log entries. +type AuditLogGroup struct { + ExecutionCount int + TotalCostTime time.Duration + TotalAffectedRows int64 + StmtTypes stmtTypesSet +} + +// AuditLogAnalyzeResult is the result of analyzing audit logs. +type AuditLogAnalyzeResult map[string]AuditLogGroup + +func (r AuditLogAnalyzeResult) Merge(other AuditLogAnalyzeResult) { + for sql, group := range other { + finalGroup := r[sql] + finalGroup.ExecutionCount += group.ExecutionCount + finalGroup.TotalCostTime += group.TotalCostTime + finalGroup.TotalAffectedRows += group.TotalAffectedRows + for stmtType := range group.StmtTypes { + if finalGroup.StmtTypes == nil { + finalGroup.StmtTypes = make(map[string]struct{}) + } + finalGroup.StmtTypes[stmtType] = struct{}{} + } + r[sql] = finalGroup + } +} + +// AnalyzeConfig is the configuration for audit log analysis. +type AnalyzeConfig struct { + Input string + Start time.Time + End time.Time + DB string + FilterCommandWithRetry bool +} + +type auditLogAnalyzer struct { + reader LineReader + + cfg AnalyzeConfig + connInfo map[uint64]auditLogPluginConnCtx +} + +// NewAuditLogAnalyzer creates a new audit log analyzer. +func NewAuditLogAnalyzer(reader LineReader, cfg AnalyzeConfig) *auditLogAnalyzer { + return &auditLogAnalyzer{ + reader: reader, + cfg: cfg, + connInfo: make(map[uint64]auditLogPluginConnCtx), + } +} + +// Analyze analyzes the audit log and returns the analysis result. +func (a *auditLogAnalyzer) Analyze() (AuditLogAnalyzeResult, error) { + result := make(AuditLogAnalyzeResult) + + kvs := make(map[string]string, 25) + for { + line, filename, lineIdx, err := a.reader.ReadLine() + if err != nil { + return result, err + } + clear(kvs) + err = parseLog(kvs, hack.String(line)) + if err != nil { + return result, errors.Errorf("%s, line %d: %s", filename, lineIdx, err.Error()) + } + // Only analyze the COMPLETED event + event, ok := kvs[auditPluginKeyEvent] + if !ok || event != auditPluginEventEnd { + continue + } + + // Only analyze the event within the time range + startTs, endTs, err := parseStartAndEndTs(kvs) + if err != nil { + return nil, errors.Wrapf(err, "%s, line %d", filename, lineIdx) + } + if endTs.Before(a.cfg.Start) { + continue + } + if endTs.After(a.cfg.End) { + // Reach the end time, stop analyzing. + return result, nil + } + + // Only analyze the `Query` and `Execute` commands + cmdStr := parseCommand(kvs[auditPluginKeyCommand]) + if cmdStr != "Query" && cmdStr != "Execute" { + continue + } + + // Only analyze the SQL in given database + if len(a.cfg.DB) != 0 { + databases, ok := kvs[auditPluginKeyDatabases] + if !ok { + continue + } + + includeTargetDB := false + for _, db := range strings.Split(databases, ",") { + if db == a.cfg.DB { + includeTargetDB = true + } + } + if !includeTargetDB { + continue + } + } + + // Try to filter out retried commands + connID, err := strconv.ParseUint(kvs[auditPluginKeyConnID], 10, 64) + if err != nil { + return result, errors.Wrapf(err, "%s, line %d: parse conn id failed: %s", filename, lineIdx, kvs[auditPluginKeyConnID]) + } + connInfo := a.connInfo[connID] + if a.cfg.FilterCommandWithRetry { + if retryStr, ok := kvs[auditPluginKeyRetry]; ok { + // If it's a retry command, just skip it. + if retryStr == "true" { + continue + } + } + } else { + sql, err := parseSQL(kvs[auditPluginKeySQL]) + if err != nil { + return result, errors.Wrapf(err, "%s, line %d: unquote sql failed: %s", filename, lineIdx, kvs[auditPluginKeySQL]) + } + if isDuplicatedWrite(connInfo.lastCmd, kvs, cmdStr, sql, startTs, endTs) { + continue + } + } + + sql, err := parseSQL(kvs[auditPluginKeySQL]) + if err != nil { + return result, errors.Wrapf(err, "unquote sql failed: %s", kvs[auditPluginKeySQL]) + } + normalizedSQL := parser.Normalize(sql, "ON") + group := result[normalizedSQL] + + var costTime time.Duration + costTimeStr := kvs[auditPluginKeyCostTime] + if len(costTimeStr) != 0 { + millis, err := strconv.ParseFloat(costTimeStr, 32) + if err != nil { + return result, errors.Errorf("parsing cost time failed: %s", costTimeStr) + } + costTime = time.Duration(millis) * (time.Millisecond) + } + + var affectedRows int64 + affectedRowsStr := kvs[auditPluginKeyRows] + if len(affectedRowsStr) != 0 { + affectedRows, err = strconv.ParseInt(affectedRowsStr, 10, 64) + if err != nil { + return result, errors.Errorf("parsing affected rows failed: %s", affectedRowsStr) + } + } + + // Record the last command info for deduplication. We only recorded the needed fields here. + connInfo.lastCmd = &Command{ + StartTs: startTs, + EndTs: endTs, + ConnID: connID, + } + switch cmdStr { + case "Query": + connInfo.lastCmd.Type = pnet.ComQuery + connInfo.lastCmd.Payload = append([]byte{pnet.ComQuery.Byte()}, hack.Slice(sql)...) + case "Execute": + connInfo.lastCmd.Type = pnet.ComStmtExecute + connInfo.lastCmd.PreparedStmt = sql + } + connInfo.lastCmd.StmtType = kvs[auditPluginKeyStmtType] + connInfo.lastCmd.kvs = kvs + a.connInfo[connID] = connInfo + + group.ExecutionCount++ + group.TotalCostTime += costTime + group.TotalAffectedRows += affectedRows + if len(kvs[auditPluginKeyStmtType]) != 0 { + if group.StmtTypes == nil { + group.StmtTypes = make(map[string]struct{}) + } + group.StmtTypes[kvs[auditPluginKeyStmtType]] = struct{}{} + } + result[normalizedSQL] = group + } +} diff --git a/pkg/sqlreplay/cmd/audit_log_plugin.go b/pkg/sqlreplay/cmd/audit_log_plugin.go index db9a2f67..b64d8cc7 100644 --- a/pkg/sqlreplay/cmd/audit_log_plugin.go +++ b/pkg/sqlreplay/cmd/audit_log_plugin.go @@ -30,6 +30,8 @@ const ( auditPluginKeyCostTime = "COST_TIME" auditPluginKeyPreparedStmtID = "PREPARED_STMT_ID" auditPluginKeyRetry = "RETRY" + auditPluginKeyRows = "ROWS" + auditPluginKeyDatabases = "DATABASES" auditPluginClassGeneral = "GENERAL" auditPluginClassTableAccess = "TABLE_ACCESS" @@ -603,6 +605,28 @@ func parseStmtID(value string) (uint32, error) { // Transaction retrials will record the same SQL multiple times in the audit logs, so we need to deduplicate them. func (decoder *AuditLogPluginDecoder) isDuplicatedWrite(lastCmd *Command, kvs map[string]string, cmdType, sql string, startTs, endTs time.Time) bool { + isDuplicated := isDuplicatedWrite(lastCmd, kvs, cmdType, sql, startTs, endTs) + if !isDuplicated { + return false + } + + // Record the deduplication. + decoder.dedup.Lock() + dedup := decoder.dedup.Items[lastCmd.StmtType] + dedup.Times++ + dedup.Cost += endTs.Sub(startTs) + overlap := lastCmd.EndTs.Sub(startTs) + if dedup.MinOverlap == 0 { + dedup.MinOverlap = overlap + } else if dedup.MinOverlap > overlap { + dedup.MinOverlap = overlap + } + decoder.dedup.Items[lastCmd.StmtType] = dedup + decoder.dedup.Unlock() + return true +} + +func isDuplicatedWrite(lastCmd *Command, kvs map[string]string, cmdType, sql string, startTs, endTs time.Time) bool { if lastCmd == nil { return false } @@ -637,19 +661,7 @@ func (decoder *AuditLogPluginDecoder) isDuplicatedWrite(lastCmd *Command, kvs ma default: return false } - // Record the deduplication. - decoder.dedup.Lock() - dedup := decoder.dedup.Items[lastCmd.StmtType] - dedup.Times++ - dedup.Cost += endTs.Sub(startTs) - overlap := lastCmd.EndTs.Sub(startTs) - if dedup.MinOverlap == 0 { - dedup.MinOverlap = overlap - } else if dedup.MinOverlap > overlap { - dedup.MinOverlap = overlap - } - decoder.dedup.Items[lastCmd.StmtType] = dedup - decoder.dedup.Unlock() + return true } diff --git a/pkg/sqlreplay/replay/audit_log_analyzer.go b/pkg/sqlreplay/replay/audit_log_analyzer.go new file mode 100644 index 00000000..900775a2 --- /dev/null +++ b/pkg/sqlreplay/replay/audit_log_analyzer.go @@ -0,0 +1,97 @@ +// Copyright 2025 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package replay + +import ( + "context" + "io" + "net/url" + "strings" + + _ "github.com/go-sql-driver/mysql" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tiproxy/lib/util/errors" + "github.com/pingcap/tiproxy/pkg/sqlreplay/cmd" + "github.com/pingcap/tiproxy/pkg/sqlreplay/store" + "github.com/pingcap/tiproxy/pkg/util/waitgroup" + "go.uber.org/zap" +) + +// Analyze analyzes the audit log in the given input between startTime and endTime and return the analysis result +// in CSV format. +func Analyze(lg *zap.Logger, cfg cmd.AnalyzeConfig) (cmd.AuditLogAnalyzeResult, error) { + var storages []storage.ExternalStorage + var readers []cmd.LineReader + defer func() { + for _, r := range readers { + r.Close() + } + for _, s := range storages { + s.Close() + } + }() + + storage, err := store.NewStorage(cfg.Input) + if err != nil { + return nil, errors.Wrapf(err, "invalid input %s", cfg.Input) + } + + parsedURL, err := url.Parse(cfg.Input) + if err != nil { + return nil, err + } + watcher := store.NewDirWatcher(lg.Named("dir_watcher"), strings.TrimLeft(parsedURL.Path, "/"), func(fileName string) error { + url, err := url.Parse(cfg.Input) + if err != nil { + return err + } + url.Path = fileName + s, err := store.NewStorage(url.String()) + if err != nil { + return err + } + storages = append(storages, s) + + cfg := store.ReaderCfg{ + Format: cmd.FormatAuditLogPlugin, + Dir: fileName, + FileNameFilterTime: cfg.Start, + } + reader, err := store.NewReader(lg.Named("loader"), s, cfg) + if err != nil { + return err + } + readers = append(readers, reader) + return nil + }, storage) + + err = watcher.WalkFiles(context.Background()) + if err != nil { + return nil, err + } + + var wg waitgroup.WaitGroup + resultCh := make(chan cmd.AuditLogAnalyzeResult, len(readers)) + for _, reader := range readers { + analyzer := cmd.NewAuditLogAnalyzer(reader, cfg) + wg.Run(func() { + result, err := analyzer.Analyze() + if err != nil && !errors.Is(err, io.EOF) { + lg.Error("analyze audit log failed", zap.Error(err)) + } + resultCh <- result + }, lg) + } + go func() { + wg.Wait() + close(resultCh) + }() + + finalResult := make(cmd.AuditLogAnalyzeResult) + for res := range resultCh { + finalResult.Merge(res) + } + + return finalResult, nil +}