由于工作需要,研究了一下 Java 解析 MySQL 的 binlog,使用第三方封装的库,可以获取实时插入的信息数据,满足要求。总体看下来使用还是很简单的。
- 引入 binlog 的解析库
pom.xml1 2 3 4 5
| <dependency> <groupId>com.zendesk</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.28.0</version> </dependency>
|
Github:https://github.com/osheroff/mysql-binlog-connector-java
- 编码
1 2 3 4 5 6 7 8
| mysql: binlog: hostname: localhost port: 3306 schema: tb_exp username: root password: root table-list: exp_user
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Data @Component @ConfigurationProperties(prefix = "mysql.binlog") public class BinlogProperty {
private String hostname; private String port; private String username; private String password;
private String schema;
private List<String> tableList; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
|
@Component public class BinlogUtil1 { private static final Logger log = LoggerFactory.getLogger(BinlogUtil.class); @Autowired private BinlogProperty binlogProperty;
private static final Map<Long, String> TABLE_MAP = new HashMap<>();
@PostConstruct public void handle() { ThreadUtil.execAsync(() -> { binlogListening();
log.info("正在监听 <{}:{}:{}> binlog……", binlogProperty.getHostname(), binlogProperty.getSchema(), binlogProperty.getTableList()); }); }
public void binlogListening() { BinaryLogClient client = new BinaryLogClient(binlogProperty.getHostname(), Integer.valueOf(binlogProperty.getPort()), binlogProperty.getUsername(), binlogProperty.getPassword()); client.setServerId(1); client.registerEventListener(event -> { EventData data = event.getData(); EventType eventType = event.getHeader().getEventType();
if (Objects.equals(EventType.TABLE_MAP, eventType)) { TableMapEventData tableMapEventData = (TableMapEventData) data; String database = tableMapEventData.getDatabase(); String tableName = tableMapEventData.getTable(); if (Objects.equals(binlogProperty.getSchema(), database) && binlogProperty.getTableList().contains(tableName)) { TABLE_MAP.put(tableMapEventData.getTableId(), tableName); } } if (EventType.isWrite(eventType)) { WriteRowsEventData wr = (WriteRowsEventData) data; String tableName = TABLE_MAP.get(wr.getTableId()); if (Objects.nonNull(tableName)) { log.info("{} =============== INSERT", tableName); System.out.println(wr.toString()); } } if (EventType.isUpdate(eventType)) { UpdateRowsEventData ur = (UpdateRowsEventData) data; String tableName = TABLE_MAP.get(ur.getTableId()); if (Objects.nonNull(tableName)) { log.info("{} =============== UPDATE", tableName); System.out.println(ur.toString()); } } }); connectServer(client); }
public static String getValueByIndex(EventData data, int index) { if (data instanceof WriteRowsEventData) { WriteRowsEventData wr = (WriteRowsEventData) data; BitSet includedColumns = wr.getIncludedColumns(); if (includedColumns.get(index)) { List<Serializable[]> rows = wr.getRows(); Serializable[] arr = rows.get(0); if (Objects.nonNull(arr[index])) { return arr[index].toString(); } } } if (data instanceof UpdateRowsEventData) { UpdateRowsEventData ur = (UpdateRowsEventData) data; BitSet includedColumns = ur.getIncludedColumns(); if (includedColumns.get(index)) { Map.Entry<Serializable[], Serializable[]> entry = ur.getRows().get(0); Serializable[] newValue = entry.getValue(); if (newValue[index] != null) { return newValue[index].toString(); } } } return null; }
private static List<Map<String, Object>> getValueByIndex(EventData data, int index, String key) { List<Map<String, Object>> res = new ArrayList<>(); if (data instanceof WriteRowsEventData) { WriteRowsEventData wr = (WriteRowsEventData) data; BitSet includedColumns = wr.getIncludedColumns(); if (includedColumns.get(index)) { List<Serializable[]> rows = wr.getRows(); rows.forEach(r -> { Serializable value = r[index]; if (Objects.nonNull(value)) { Map<String, Object> map = new HashMap<>(); map.put(key, value); res.add(map); } }); } } if (data instanceof UpdateRowsEventData) { UpdateRowsEventData ur = (UpdateRowsEventData) data; BitSet includedColumns = ur.getIncludedColumns(); if (includedColumns.get(index)) { ur.getRows().forEach(u -> { Serializable[] newValue = u.getValue(); Serializable obj = newValue[index]; if (Objects.nonNull(obj)) { Map<String, Object> map = new HashMap<>(); map.put(key, obj); res.add(map); } }); } } return res; }
private static void connectServer(BinaryLogClient client) { try { client.connect(); } catch (IOException e) { log.error("连接失败:{}", e.getMessage()); throw new RuntimeException(e); } } }
|