Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions replication/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,19 @@ const (
ENUM_EXTRA_ROW_INFO_TYPECODE_NDB byte = iota
ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION
)

// Binlog flags, from include/mysql.h
const (
USE_HEARTBEAT_EVENT_V2 = 1 << 1
MYSQL_RPL_SKIP_TAGGED_GTIDS = 1 << 2
MYSQL_RPL_GTID = 1 << 16
MYSQL_RPL_SKIP_HEARTBEAT = 1 << 17
)

// On-The-Wire HeartBeat fields
// See enum mysql::binlog::event::codecs::binary::Heartbeat::fields in MySQL
const (
OTW_HB_HEADER_END_MARK = iota
OTW_HB_LOG_FILENAME_FIELD
OTW_HB_LOG_POSITION_FIELD
)
64 changes: 64 additions & 0 deletions replication/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,3 +945,67 @@ func (i *IntVarEvent) Dump(w io.Writer) {
fmt.Fprintf(w, "Type: %d\n", i.Type)
fmt.Fprintf(w, "Value: %d\n", i.Value)
}

// HeartbeatEvent is a HEARTBEAT_EVENT or HEARTBEAT_LOG_EVENT_V2
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_heartbeat
type HeartbeatEvent struct {
// Event version, either 1 for HEARTBEAT_EVENT or 2 for HEARTBEAT_LOG_EVENT_V2
Version int

// Filename of the binary log
Filename string

// Offset is the offset in the binlog file
Offset uint64
}

// Decode is decoding a heartbeat event payload (excluding event header and checksum)
func (h *HeartbeatEvent) Decode(data []byte) error {
switch h.Version {
case 1:
// Also known as HEARTBEAT_EVENT
h.Filename = string(data)
case 2:
// Also known as HEARTBEAT_LOG_EVENT_V2
//
// The server sends this in the binlog stream if the following is set:
// DumpCommandFlag: replication.USE_HEARTBEAT_EVENT_V2
pos := 0
for pos < len(data) {
switch data[pos] {
case OTW_HB_LOG_FILENAME_FIELD:
pos++
nameLength := int(data[pos])
pos++
h.Filename = string(data[pos : pos+nameLength])
pos += nameLength
case OTW_HB_LOG_POSITION_FIELD:
pos++
offsetLength := int(data[pos])
pos++
var n int
h.Offset, _, n = mysql.LengthEncodedInt(data[pos : pos+offsetLength])
if n != offsetLength {
return errors.New("failed to read binary log offset")
}
pos += offsetLength
case OTW_HB_HEADER_END_MARK:
pos++
default:
return errors.New("unknown heartbeatv2 field")
}
}
default:
return errors.New("unknown heartbeat version")
}

return nil
}

func (h *HeartbeatEvent) Dump(w io.Writer) {
fmt.Fprintf(w,
"Heartbeat Event Version: %d\nBinlog File Name: %s\nBinlog Offset: %d\n",
h.Version, h.Filename, h.Offset,
)
fmt.Fprintln(w)
}
59 changes: 59 additions & 0 deletions replication/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,62 @@ func TestPreviousGTIDEvent(t *testing.T) {
require.Equal(t, tc.GTIDSets, e.GTIDSets)
}
}

func TestHeartbeatEvent(t *testing.T) {
testcases := []struct {
err bool
version int
input []byte // make sure to strip the 4 byte checksum
hbEvent HeartbeatEvent
}{
{
false,
2,
[]byte{0x1, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x1, 0x9e, 0x0},
HeartbeatEvent{Version: 2, Filename: "binlog.000001", Offset: 158},
},
{
true,
2,
// 0x3 is not a valid field type
[]byte{0x3, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x1, 0x9e, 0x0},
HeartbeatEvent{},
},
{
true,
2,
// 0x2, 0x9e is not a valid length encoded integer
[]byte{0x1, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x2, 0x9e, 0x0},
HeartbeatEvent{Version: 2, Filename: "binlog.000001", Offset: 158},
},
{
false,
1,
[]byte{0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31},
HeartbeatEvent{Version: 1, Filename: "binlog.000001"},
},
{
true,
3, // invalid heartbeat version
[]byte{},
HeartbeatEvent{},
},
{
false,
2,
[]byte{0x1, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x4, 0xfd, 0xbb, 0xaf, 0x27, 0x0},
HeartbeatEvent{Version: 2, Filename: "binlog.000001", Offset: 2600891},
},
}

for _, tc := range testcases {
e := HeartbeatEvent{Version: tc.version}
err := e.Decode(tc.input)
if tc.err {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tc.hbEvent, e)
}
}
}
3 changes: 0 additions & 3 deletions replication/generic_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,3 @@ func (e *GenericEvent) Decode(data []byte) error {
// MessageLength uint8
// Message []byte
// }

// type HeartbeatEvent struct {
// }
4 changes: 4 additions & 0 deletions replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
e = &IntVarEvent{}
case TRANSACTION_PAYLOAD_EVENT:
e = p.newTransactionPayloadEvent()
case HEARTBEAT_EVENT:
e = &HeartbeatEvent{Version: 1}
case HEARTBEAT_LOG_EVENT_V2:
e = &HeartbeatEvent{Version: 2}
default:
e = &GenericEvent{}
}
Expand Down
Loading