由于工作需要,研究了一下 Java 解析 MySQL 的 binlog,使用第三方封装的库,可以获取实时插入的信息数据,满足要求。总体看下来使用还是很简单的。

  1. 引入 binlog 的解析库
pom.xml
1
2
3
4
5
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.28.0</version>
</dependency>

Github:https://github.com/osheroff/mysql-binlog-connector-java

  1. 编码
1
2
3
4
5
6
7
8
mysql:
binlog:
hostname: localhost
port: 3306
schema: tb_exp
username: root
password: root
table-list: exp_user
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Data
@Component
@ConfigurationProperties(prefix = "mysql.binlog")
public class BinlogProperty {

private String hostname;
private String port;
private String username;
private String password;
/**
* 数据库
*/
private String schema;
/**
* 监听的表集合
*/
private List<String> tableList;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/**
* binlog
*
* @author jhlz
* @since 2023/5/29 9:57
*/
@Component
public class BinlogUtil1 {
private static final Logger log = LoggerFactory.getLogger(BinlogUtil.class);
@Autowired
private BinlogProperty binlogProperty;
/**
* 数据库名称 map
*/
private static final Map<Long, String> TABLE_MAP = new HashMap<>();

/**
* 异步线程处理 binlog,防止阻塞主线程
*/
@PostConstruct
public void handle() {
ThreadUtil.execAsync(() -> {
binlogListening();

log.info("正在监听 <{}:{}:{}> binlog……",
binlogProperty.getHostname(),
binlogProperty.getSchema(),
binlogProperty.getTableList());
});
}

/**
* binlog 监听
*/
public void binlogListening() {
BinaryLogClient client = new BinaryLogClient(binlogProperty.getHostname(),
Integer.valueOf(binlogProperty.getPort()),
binlogProperty.getUsername(),
binlogProperty.getPassword());
client.setServerId(1);
// 注册监听
client.registerEventListener(event -> {
EventData data = event.getData();
EventType eventType = event.getHeader().getEventType();

if (Objects.equals(EventType.TABLE_MAP, eventType)) {
TableMapEventData tableMapEventData = (TableMapEventData) data;
String database = tableMapEventData.getDatabase();
String tableName = tableMapEventData.getTable();
// 目标明确!缓存指定数据库以及指定的表名
if (Objects.equals(binlogProperty.getSchema(), database)
&& binlogProperty.getTableList().contains(tableName)) {
TABLE_MAP.put(tableMapEventData.getTableId(), tableName);
}
}
// 新增的 binlog 数据
if (EventType.isWrite(eventType)) {
WriteRowsEventData wr = (WriteRowsEventData) data;
// 获取表名称
String tableName = TABLE_MAP.get(wr.getTableId());
if (Objects.nonNull(tableName)) {
log.info("{} =============== INSERT", tableName);
System.out.println(wr.toString());
}
}
// 更新的 binlog 数据
if (EventType.isUpdate(eventType)) {
UpdateRowsEventData ur = (UpdateRowsEventData) data;
String tableName = TABLE_MAP.get(ur.getTableId());
if (Objects.nonNull(tableName)) {
log.info("{} =============== UPDATE", tableName);
System.out.println(ur.toString());
}
}
});
// 连接服务
connectServer(client);
}

/**
* 获取指定索引位置的最新值,如果是批量,只拿第一条记录的值
*
* @param data binlog 数据
* @param index 数据库目标字段的索引位置,起始索引 0
* @return 目标索引位置的值或者 null
*/
public static String getValueByIndex(EventData data, int index) {
// 新增事件
if (data instanceof WriteRowsEventData) {
WriteRowsEventData wr = (WriteRowsEventData) data;
BitSet includedColumns = wr.getIncludedColumns();
// 如果该位置有值为 true,否则是 false
if (includedColumns.get(index)) {
List<Serializable[]> rows = wr.getRows();
// 只拿第一条记录的值
Serializable[] arr = rows.get(0);
// 需要判空,如果一条数据中某列的值为 null 则 NPE
if (Objects.nonNull(arr[index])) {
return arr[index].toString();
}
}
}
// 更新事件
if (data instanceof UpdateRowsEventData) {
UpdateRowsEventData ur = (UpdateRowsEventData) data;
BitSet includedColumns = ur.getIncludedColumns();
if (includedColumns.get(index)) {
// 只获取第一条记录
Map.Entry<Serializable[], Serializable[]> entry = ur.getRows().get(0);
// entry 的 key 为旧值,value 为更新值
Serializable[] newValue = entry.getValue();
if (newValue[index] != null) {
return newValue[index].toString();
}
}
}
return null;
}

/**
* 获取事件数据指定索引位置的值
*
* @param data binlog 数据
* @param index 数据库目标字段的索引位置
* @param key 数据库目标字段的名称
* @return map,键为传入的 key 值,值为 binlog 读取的新值
*/
private static List<Map<String, Object>> getValueByIndex(EventData data, int index, String key) {
List<Map<String, Object>> res = new ArrayList<>();
if (data instanceof WriteRowsEventData) {
WriteRowsEventData wr = (WriteRowsEventData) data;
BitSet includedColumns = wr.getIncludedColumns();
if (includedColumns.get(index)) {
// 获取每一行的值,Serializable[] 一条记录的所有值
List<Serializable[]> rows = wr.getRows();
// TODO 批量插入不清楚什么格式,先循环吧,使用 MAP 装载结果,后面出问题再调试
rows.forEach(r -> {
// 拿到索引位的值
Serializable value = r[index];
// 若 value 为 null,则抛出 NPE
if (Objects.nonNull(value)) {
Map<String, Object> map = new HashMap<>();
map.put(key, value);
res.add(map);
}
});
}
}
if (data instanceof UpdateRowsEventData) {
UpdateRowsEventData ur = (UpdateRowsEventData) data;
BitSet includedColumns = ur.getIncludedColumns();
if (includedColumns.get(index)) {
ur.getRows().forEach(u -> {
Serializable[] newValue = u.getValue();
Serializable obj = newValue[index];
if (Objects.nonNull(obj)) {
Map<String, Object> map = new HashMap<>();
map.put(key, obj);
res.add(map);
}
});
}
}
return res;
}

/**
* 连接服务器
*
* @param client 客户端
*/
private static void connectServer(BinaryLogClient client) {
try {
client.connect();
} catch (IOException e) {
log.error("连接失败:{}", e.getMessage());
throw new RuntimeException(e);
}
}
}

本站由 江湖浪子 使用 Stellar 1.28.1 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。