由于工作需要,研究了一下 Java 解析 MySQL 的 binlog,使用第三方封装的库,可以获取实时插入的信息数据,满足要求。总体看下来使用还是很简单的。
- 引入 binlog 的解析库
pom.xml1 2 3 4 5
| <dependency> <groupId>com.zendesk</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.28.0</version> </dependency>
|
Github:https://github.com/osheroff/mysql-binlog-connector-java
- 编码
1 2 3 4 5 6 7 8
| mysql: binlog: hostname: localhost port: 3306 schema: tb_exp username: root password: root table-list: exp_user
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Data @Component @ConfigurationProperties(prefix = "mysql.binlog") public class BinlogProperty {
private String hostname; private String port; private String username; private String password;
private String schema;
private List<String> tableList; }
|

|
@Component public class BinlogUtil1 { private static final Logger log = LoggerFactory.getLogger(BinlogUtil.class); @Autowired private BinlogProperty binlogProperty;
private static final Map<Long, String> TABLE_MAP = new HashMap<>();
@PostConstruct public void handle() { ThreadUtil.execAsync(() -> { binlogListening();
log.info("正在监听 <{}:{}:{}> binlog……", binlogProperty.getHostname(), binlogProperty.getSchema(), binlogProperty.getTableList()); }); }
public void binlogListening() { BinaryLogClient client = new BinaryLogClient(binlogProperty.getHostname(), Integer.valueOf(binlogProperty.getPort()), binlogProperty.getUsername(), binlogProperty.getPassword()); client.setServerId(1); client.registerEventListener(event -> { EventData data = event.getData(); EventType eventType = event.getHeader().getEventType();
if (Objects.equals(EventType.TABLE_MAP, eventType)) { TableMapEventData tableMapEventData = (TableMapEventData) data; String database = tableMapEventData.getDatabase(); String tableName = tableMapEventData.getTable(); if (Objects.equals(binlogProperty.getSchema(), database) && binlogProperty.getTableList().contains(tableName)) { TABLE_MAP.put(tableMapEventData.getTableId(), tableName); } } if (EventType.isWrite(eventType)) { WriteRowsEventData wr = (WriteRowsEventData) data; String tableName = TABLE_MAP.get(wr.getTableId()); if (Objects.nonNull(tableName)) { log.info("{} =============== INSERT", tableName); System.out.println(wr.toString()); } } if (EventType.isUpdate(eventType)) { UpdateRowsEventData ur = (UpdateRowsEventData) data; String tableName = TABLE_MAP.get(ur.getTableId()); if (Objects.nonNull(tableName)) { log.info("{} =============== UPDATE", tableName); System.out.println(ur.toString()); } } }); connectServer(client); }
public static String getValueByIndex(EventData data, int index) { if (data instanceof WriteRowsEventData) { WriteRowsEventData wr = (WriteRowsEventData) data; BitSet includedColumns = wr.getIncludedColumns(); if (includedColumns.get(index)) { List<Serializable[]> rows = wr.getRows(); Serializable[] arr = rows.get(0); if (Objects.nonNull(arr[index])) { return arr[index].toString(); } } } if (data instanceof UpdateRowsEventData) { UpdateRowsEventData ur = (UpdateRowsEventData) data; BitSet includedColumns = ur.getIncludedColumns(); if (includedColumns.get(index)) { Map.Entry<Serializable[], Serializable[]> entry = ur.getRows().get(0); Serializable[] newValue = entry.getValue(); if (newValue[index] != null) { return newValue[index].toString(); } } } return null; }
private static List<Map<String, Object>> getValueByIndex(EventData data, int index, String key) { List<Map<String, Object>> res = new ArrayList<>(); if (data instanceof WriteRowsEventData) { WriteRowsEventData wr = (WriteRowsEventData) data; BitSet includedColumns = wr.getIncludedColumns(); if (includedColumns.get(index)) { List<Serializable[]> rows = wr.getRows(); rows.forEach(r -> { Serializable value = r[index]; if (Objects.nonNull(value)) { Map<String, Object> map = new HashMap<>(); map.put(key, value); res.add(map); } }); } } if (data instanceof UpdateRowsEventData) { UpdateRowsEventData ur = (UpdateRowsEventData) data; BitSet includedColumns = ur.getIncludedColumns(); if (includedColumns.get(index)) { ur.getRows().forEach(u -> { Serializable[] newValue = u.getValue(); Serializable obj = newValue[index]; if (Objects.nonNull(obj)) { Map<String, Object> map = new HashMap<>(); map.put(key, obj); res.add(map); } }); } } return res; }
private static void connectServer(BinaryLogClient client) { try { client.connect(); } catch (IOException e) { log.error("连接失败:{}", e.getMessage()); throw new RuntimeException(e); } } }
|