MySQL/MariaDB binlog change data capture (CDC) connector for .NET
Acts as a replication client streaming binary log events in real-time.
Designed for reactive push-model applications, event sourcing or derived data systems.
NuGet feed: MySqlCdc
Transaction log events are immutable and appended in strictly sequential order. This simplifies your concurrency model and allows you to avoid distributed locks that handle race conditions from parallel database requests.
- Event sourcing.
- Cache invalidation.
- OLAP. Analytics. Reporting. Data Warehouse.
- Real-time chat/messenger using web sockets.
- Synchronizing web/mobile client state with backend.
- Replicating MySQL database to Memcached/Redis cache.
- Replicating MySQL database to NoSQL/Elasticsearch. Denormalization. Derived data system.
Be careful when working with binary log event streaming.
- Binlog stream includes changes made to all databases on the master server including sql queries with sensitive information and you may leak data from the databases. Consider deploying your database to an isolated instance.
- Transaction log represents a sequence of append-only files. It includes changes for databases/tables that you deleted and then recreated. Make sure you don't replay the phantom events in your application.
Please note the lib currently has the following limitations:
- Supports only standard auth plugins
mysql_native_passwordandcaching_sha2_password. - Currently, the library doesn't fully support SSL encryption.
Please make sure the following requirements are met:
-
The user is granted
REPLICATION SLAVE,REPLICATION CLIENTprivileges. -
Binary logging is enabled(it's done by default in MySQL 8). To enable binary logging configure the following settings on the master server and restart the service:
binlog_format = row binlog_row_image = fullMySQL 5.6/5.7 and MariaDB 10.1 also require the following line:
server-id = 1 -
Optionally you can enable logging table metadata in MySQL(like column names, see
TableMetadataclass). Note the metadata is not supported in MariaDB.binlog_row_metadata = full -
Optionally you can enable logging SQL queries that precede row based events and listen to
RowsQueryEvent.MySQL
binlog_rows_query_log_events = onMariaDB
binlog_annotate_row_events = on -
Also note that there are
expire_logs_days,binlog_expire_logs_secondssettings that control how long binlog files live. By default MySQL/MariaDB have expiration time set and delete expired binlog files. You can disable automatic purging of binlog files this way:expire_logs_days = 0
You have to obtain columns ordinal position of the table that you are interested in. The library is not responsible for this so you have to do it using another tool.
select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION
from INFORMATION_SCHEMA.COLUMNS
where TABLE_NAME='AspNetUsers' and TABLE_SCHEMA='Identity'
order by ORDINAL_POSITION;Alternatively, in MySQL 5.6 and newer(but not in MariaDB) you can obtain column names by logging full metadata (see TableMetadataEvent.Metadata).
This way the metadata is logged with each TableMapEvent which impacts bandwidth.
binlog_row_metadata = full
Data is stored in Cells property of row events in the same order. See the C# sample project.
var client = new BinlogClient(options =>
{
options.Port = 3306;
options.UseSsl = false;
options.Username = "root";
options.Password = "Qwertyu1";
options.HeartbeatInterval = TimeSpan.FromSeconds(30);
options.Blocking = true;
// Start replication from MariaDB GTID. Recommended.
options.Binlog = BinlogOptions.FromGtid(GtidList.Parse("0-1-270"));
// Start replication from MySQL GTID. Recommended.
var gtidSet = "d4c17f0c-4f11-11ea-93e3-325d3e1cd1c8:1-107, f442510a-2881-11ea-b1dd-27916133dbb2:1-7";
options.Binlog = BinlogOptions.FromGtid(GtidSet.Parse(gtidSet));
// Start replication from the master binlog filename and position
options.Binlog = BinlogOptions.FromPosition("mysql-bin.000008", 195);
// Start replication from the master last binlog filename and position.
options.Binlog = BinlogOptions.FromEnd();
// Start replication from the master first available(not purged) binlog filename and position.
options.Binlog = BinlogOptions.FromStart();
});
await client.ReplicateAsync(async (binlogEvent) =>
{
var state = client.State;
if (binlogEvent is TableMapEvent tableMap)
{
await HandleTableMapEvent(tableMap);
}
else if (binlogEvent is WriteRowsEvent writeRows)
{
await HandleWriteRowsEvent(writeRows);
}
else if (binlogEvent is UpdateRowsEvent updateRows)
{
await HandleUpdateRowsEvent(updateRows);
}
else if (binlogEvent is DeleteRowsEvent deleteRows)
{
await HandleDeleteRowsEvent(deleteRows);
}
else await PrintEventAsync(binlogEvent);
});A typical transaction has the following structure.
GtidEventif gtid mode is enabled.- One or many
TableMapEventevents.- One or many
WriteRowsEventevents. - One or many
UpdateRowsEventevents. - One or many
DeleteRowsEventevents.
- One or many
XidEventindicating commit of the transaction.
It's best practice to use GTID replication with the FromGtid method. Using the approach you can correctly perform replication failover.
Note that in GTID mode FromGtid has the following behavior:
FromGtid(@@gtid_purged)acts likeFromStart()FromGtid(@@gtid_executed)acts likeFromEnd()
In some cases you will need to read binlog files offline from the file system.
This can be done using BinlogReader class.
using (Stream stream = File.OpenRead("mariadb-bin.000002"))
{
var reader = new BinlogReader(new MariaDbEventDeserializer(), stream);
while (true)
{
var @event = await reader.ReadEventAsync();
if (@event != null)
{
await PrintEventAsync(@event);
}
else
{
break;
}
}
}| MySQL Type | .NET type |
|---|---|
| GEOMETRY | byte[] |
| JSON (MySQL) | byte[], see below |
| JSON (MariaDB) | byte[], see below |
| BIT | bool[] |
| TINYINT | byte |
| SMALLINT | short |
| MEDIUMINT | int(3), see below |
| INT | int |
| BIGING | long |
| FLOAT | float |
| DOUBLE | double |
| DECIMAL | string |
| VARCHAR, VARBINARY | string |
| CHAR | string |
| ENUM | int |
| SET | long |
| YEAR | int |
| DATE | Nullable<DateTime> |
| DATETIME | Nullable<DateTime> |
| TIME | TimeSpan |
| TIMESTAMP | DateTimeOffset |
| BLOB types | byte[] |
-
Invalid DATE, DATETIME values(0000-00-00) are parsed as DateTime null.
-
TIME, DATETIME, TIMESTAMP (MySQL 5.6.4+) will lose microseconds when converted to .NET types as MySQL types have bigger fractional part than corresponding .NET types can store.
-
Signedness of numeric columns cannot be determined in MariaDB/MySQL 5.5. The library stores all numeric columns as CLS-compliant types from the table above. The client has the information and should manually cast to
sbyte,ushort,uintorulongif necessary. The only exception is 3-byteunsigned mediumintwhich must be casted this way:// casting unsigned mediumint columns uint cellValue = (uint)(int)row.Cells[0]; uint mediumint = (cellValue << 8) >> 8;
-
JSON columns have different storage format in MariaDB and MySQL:
// MariaDB stores JSON as strings byte[] data = (byte[])row.Cells[0]; string json = Encoding.UTF8.GetString(data); // MySQL stores JSON in binary format that needs to be parsed byte[] data = (byte[])row.Cells[0]; string json = MySqlCdc.Providers.MySql.JsonParser.Parse(data);
-
GEOMETRY type is read as
byte[]but there is no parser that constructs .NET objects. -
DECIMAL type is parsed to string as MySql decimal has bigger range(65 digits) than .NET decimal.
The project is based on mysql-binlog-connector-java library, MariaDB and MySQL documentation.
- Java: https://github.com/shyiko/mysql-binlog-connector-java
- PHP: https://github.com/krowinski/php-mysql-replication
- Python: https://github.com/noplay/python-mysql-replication
MySqlCdc supports both MariaDB & MySQL server.
| MariaDB | Status |
|---|---|
| 10.1 | ✅ Supported |
| 10.2 | ✅ Supported |
| 10.3 | ✅ Supported |
| 10.4 | ✅ Supported |
| MySQL | Status |
|---|---|
| 5.6 | ✅ Supported |
| 5.7 | ✅ Supported |
| 8.0 | ✅ Supported |
Are uncommitted changes written to binlog?
- If you make transactional changes binlog will only include committed transactions in their commit order to provide consistency.
- If you make non-transactional changes binlog will include changes from uncommitted transactions even if the transactions are rolled back.
The library is provided under the MIT License.