Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions cmd/auditloganalyzer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2025 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package main

import (
"database/sql"
"encoding/csv"
"fmt"
"os"
"time"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/cmd"
lg "github.com/pingcap/tiproxy/lib/util/logger"
replaycmd "github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
"github.com/pingcap/tiproxy/pkg/util/versioninfo"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

const (
formatCSV = "csv"
formatMySQL = "mysql"
)

func main() {
rootCmd := &cobra.Command{
Use: os.Args[0],
Short: "start the analyzer",
Version: fmt.Sprintf("%s, commit %s", versioninfo.TiProxyVersion, versioninfo.TiProxyGitHash),
}
rootCmd.SetOut(os.Stdout)
rootCmd.SetErr(os.Stderr)

input := rootCmd.PersistentFlags().String("input", "", "directory for traffic files")
startTime := rootCmd.PersistentFlags().Time("start-time", time.Time{}, []string{time.RFC3339, time.RFC3339Nano}, "the start time to analyze the audit log.")
endTime := rootCmd.PersistentFlags().Time("end-time", time.Time{}, []string{time.RFC3339, time.RFC3339Nano}, "the end time to analyze the audit log.")
output := rootCmd.PersistentFlags().String("output", "audit_log_analysis_result.csv", "the output path for analysis result.")
db := rootCmd.PersistentFlags().String("db", "", "the target database to analyze. Empty means all databases will be recorded.")
filterCommandWithRetry := rootCmd.PersistentFlags().Bool("filter-command-with-retry", false, "filter out commands that are retries according to the audit log.")
outputFormat := rootCmd.PersistentFlags().String("output-format", "csv", "the output format for analysis result. Currently only 'csv' and 'mysql' is supported.")
outputTableName := rootCmd.PersistentFlags().String("output-table-name", "audit_log_analysis", "the output table name when output format is 'mysql'.")

rootCmd.RunE = func(cmd *cobra.Command, _ []string) error {
logger, _, _, err := lg.BuildLogger(&config.Log{
Encoder: "tidb",
LogOnline: config.LogOnline{
Level: "info",
},
})
if err != nil {
return err
}

result, err := replay.Analyze(logger, replaycmd.AnalyzeConfig{
Input: *input,
Start: *startTime,
End: *endTime,
DB: *db,
FilterCommandWithRetry: *filterCommandWithRetry,
})
if err != nil {
return err
}

switch *outputFormat {
case formatCSV:
logger.Info("writing analysis result to CSV", zap.String("output", *output))
return writeAnalyzeResultToCSV(result, *output)
case formatMySQL:
logger.Info("writing analysis result to MySQL", zap.String("output", *output), zap.String("table", *outputTableName))
return writeAnalyzeResultToMySQL(result, *output, *outputTableName)
default:
return fmt.Errorf("unsupported output format: %s", *outputFormat)
}
}

cmd.RunRootCommand(rootCmd)
}

func writeAnalyzeResultToCSV(result replaycmd.AuditLogAnalyzeResult, outputPath string) error {
f, err := os.Create(outputPath)
if err != nil {
return err
}
defer f.Close()
w := csv.NewWriter(f)
for sql, group := range result {
record := []string{
sql,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also output the digests so that it's easier to filter a specific SQL.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And stmt type

fmt.Sprintf("%d", group.ExecutionCount),
fmt.Sprintf("%d", group.TotalCostTime.Microseconds()),
fmt.Sprintf("%d", group.TotalAffectedRows),
group.StmtTypes.String(),
}
if err := w.Write(record); err != nil {
return err
}
}
w.Flush()
if err := w.Error(); err != nil {
return err
}
return nil
}

func writeAnalyzeResultToMySQL(result replaycmd.AuditLogAnalyzeResult, outputPath string, outputTableName string) error {
db, err := sql.Open("mysql", outputPath)
if err != nil {
return err
}
defer db.Close()

createTableSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
sql_text TEXT,
execution_count INT,
total_cost_time BIGINT,
total_affected_rows BIGINT,
statement_types TEXT
);`, outputTableName)
_, err = db.Exec(createTableSQL)
if err != nil {
return err
}

insertSQL := fmt.Sprintf(`INSERT INTO %s (sql_text, execution_count, total_cost_time, total_affected_rows, statement_types) VALUES (?, ?, ?, ?, ?)`, outputTableName)
stmt, err := db.Prepare(insertSQL)
if err != nil {
return err
}
defer stmt.Close()

for sqlText, group := range result {
_, err = stmt.Exec(sqlText, group.ExecutionCount, group.TotalCostTime.Microseconds(), group.TotalAffectedRows, group.StmtTypes.String())
if err != nil {
return err
}
}

return nil
}
215 changes: 215 additions & 0 deletions pkg/sqlreplay/cmd/audit_log_analyzer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright 2025 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"strconv"
"strings"
"time"

"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tiproxy/lib/util/errors"
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
"github.com/siddontang/go/hack"
)

type stmtTypesSet map[string]struct{}

func (s stmtTypesSet) String() string {
var types []string
for stmtType := range s {
types = append(types, stmtType)
}
return strings.Join(types, ",")
}

// AuditLogGroup is the analysis result for a group of similar audit log entries.
type AuditLogGroup struct {
ExecutionCount int
TotalCostTime time.Duration
TotalAffectedRows int64
StmtTypes stmtTypesSet
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why multiple statement types?

}

// AuditLogAnalyzeResult is the result of analyzing audit logs.
type AuditLogAnalyzeResult map[string]AuditLogGroup

func (r AuditLogAnalyzeResult) Merge(other AuditLogAnalyzeResult) {
for sql, group := range other {
finalGroup := r[sql]
finalGroup.ExecutionCount += group.ExecutionCount
finalGroup.TotalCostTime += group.TotalCostTime
finalGroup.TotalAffectedRows += group.TotalAffectedRows
for stmtType := range group.StmtTypes {
if finalGroup.StmtTypes == nil {
finalGroup.StmtTypes = make(map[string]struct{})
}
finalGroup.StmtTypes[stmtType] = struct{}{}
}
r[sql] = finalGroup
}
}

// AnalyzeConfig is the configuration for audit log analysis.
type AnalyzeConfig struct {
Input string
Start time.Time
End time.Time
DB string
FilterCommandWithRetry bool
}

type auditLogAnalyzer struct {
reader LineReader

cfg AnalyzeConfig
connInfo map[uint64]auditLogPluginConnCtx
}

// NewAuditLogAnalyzer creates a new audit log analyzer.
func NewAuditLogAnalyzer(reader LineReader, cfg AnalyzeConfig) *auditLogAnalyzer {
return &auditLogAnalyzer{
reader: reader,
cfg: cfg,
connInfo: make(map[uint64]auditLogPluginConnCtx),
}
}

// Analyze analyzes the audit log and returns the analysis result.
func (a *auditLogAnalyzer) Analyze() (AuditLogAnalyzeResult, error) {
result := make(AuditLogAnalyzeResult)

kvs := make(map[string]string, 25)
for {
line, filename, lineIdx, err := a.reader.ReadLine()
if err != nil {
return result, err
}
clear(kvs)
err = parseLog(kvs, hack.String(line))
if err != nil {
return result, errors.Errorf("%s, line %d: %s", filename, lineIdx, err.Error())
}
// Only analyze the COMPLETED event
event, ok := kvs[auditPluginKeyEvent]
if !ok || event != auditPluginEventEnd {
continue
}

// Only analyze the event within the time range
startTs, endTs, err := parseStartAndEndTs(kvs)
if err != nil {
return nil, errors.Wrapf(err, "%s, line %d", filename, lineIdx)
}
if endTs.Before(a.cfg.Start) {
continue
}
if endTs.After(a.cfg.End) {
// Reach the end time, stop analyzing.
return result, nil
}

// Only analyze the `Query` and `Execute` commands
cmdStr := parseCommand(kvs[auditPluginKeyCommand])
if cmdStr != "Query" && cmdStr != "Execute" {
continue
}

// Only analyze the SQL in given database
if len(a.cfg.DB) != 0 {
databases, ok := kvs[auditPluginKeyDatabases]
if !ok {
continue
}

includeTargetDB := false
for _, db := range strings.Split(databases, ",") {
if db == a.cfg.DB {
includeTargetDB = true
}
}
if !includeTargetDB {
continue
}
}

// Try to filter out retried commands
connID, err := strconv.ParseUint(kvs[auditPluginKeyConnID], 10, 64)
if err != nil {
return result, errors.Wrapf(err, "%s, line %d: parse conn id failed: %s", filename, lineIdx, kvs[auditPluginKeyConnID])
}
connInfo := a.connInfo[connID]
if a.cfg.FilterCommandWithRetry {
if retryStr, ok := kvs[auditPluginKeyRetry]; ok {
// If it's a retry command, just skip it.
if retryStr == "true" {
continue
}
}
} else {
sql, err := parseSQL(kvs[auditPluginKeySQL])
if err != nil {
return result, errors.Wrapf(err, "%s, line %d: unquote sql failed: %s", filename, lineIdx, kvs[auditPluginKeySQL])
}
if isDuplicatedWrite(connInfo.lastCmd, kvs, cmdStr, sql, startTs, endTs) {
continue
}
}

sql, err := parseSQL(kvs[auditPluginKeySQL])
if err != nil {
return result, errors.Wrapf(err, "unquote sql failed: %s", kvs[auditPluginKeySQL])
}
normalizedSQL := parser.Normalize(sql, "ON")
group := result[normalizedSQL]

var costTime time.Duration
costTimeStr := kvs[auditPluginKeyCostTime]
if len(costTimeStr) != 0 {
millis, err := strconv.ParseFloat(costTimeStr, 32)
if err != nil {
return result, errors.Errorf("parsing cost time failed: %s", costTimeStr)
}
costTime = time.Duration(millis) * (time.Millisecond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to convert int to Duration and then convert back

}

var affectedRows int64
affectedRowsStr := kvs[auditPluginKeyRows]
if len(affectedRowsStr) != 0 {
affectedRows, err = strconv.ParseInt(affectedRowsStr, 10, 64)
if err != nil {
return result, errors.Errorf("parsing affected rows failed: %s", affectedRowsStr)
}
}

// Record the last command info for deduplication. We only recorded the needed fields here.
connInfo.lastCmd = &Command{
StartTs: startTs,
EndTs: endTs,
ConnID: connID,
}
switch cmdStr {
case "Query":
connInfo.lastCmd.Type = pnet.ComQuery
connInfo.lastCmd.Payload = append([]byte{pnet.ComQuery.Byte()}, hack.Slice(sql)...)
case "Execute":
connInfo.lastCmd.Type = pnet.ComStmtExecute
connInfo.lastCmd.PreparedStmt = sql
}
connInfo.lastCmd.StmtType = kvs[auditPluginKeyStmtType]
connInfo.lastCmd.kvs = kvs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kvs is cleared before each loop, so saving kvs in it is wrong.

a.connInfo[connID] = connInfo

group.ExecutionCount++
group.TotalCostTime += costTime
group.TotalAffectedRows += affectedRows
if len(kvs[auditPluginKeyStmtType]) != 0 {
if group.StmtTypes == nil {
group.StmtTypes = make(map[string]struct{})
}
group.StmtTypes[kvs[auditPluginKeyStmtType]] = struct{}{}
}
result[normalizedSQL] = group
}
}
Loading