时间:2023-10-29来源:系统城装机大师作者:佚名
由于传统的 mysql 数据库并不擅长海量数据的检索,当数据量到达一定规模时(估算单表两千万左右),查询和插入的耗时会明显增加。同样,当需要对这些数据进行模糊查询或是数据分析时,MySQL作为事务型关系数据库很难提供良好的性能支持。使用适合的数据库来实现模糊查询是解决这个问题的关键。
但是,切换数据库会迎来两个问题,一是已有的服务对现在的 MySQL 重度依赖,二是 MySQL 的事务能力和软件生态仍然不可替代,直接迁移数据库的成本过大。我们综合考虑了下,决定同时使用多个数据库的方案,不同的数据库应用于不同的使用场景。而在支持模糊查询功能的数据库中,elasticsearch 自然是首选的查询数据库。这样后续对业务需求的切换也会非常灵活。
那具体该如何实现呢?在又拍云以往的项目中,也有遇到相似的问题。之前采用的方法是在业务中编写代码,然后同步到 elasticsearch 中。具体是这样实施的:每个系统编写特定的代码,修改 MySQL 数据库后,再将更新的数据直接推送到需要同步的数据库中,或推送到队列由消费程序来写入到数据库中。
但这个方案有一些明显的缺点:
系统高耦合,侵入式代码,使得业务逻辑复杂度增加
方案不通用,每一套同步都需要额外定制,不仅增加业务处理时间,还会提升软件复复杂度
工作量和复杂度增加
在业务中编写同步方案,虽然在项目早期比较方便,但随着数据量和系统的发展壮大,往往最后会成为业务的大痛点。
既然以往的方案有明显的缺点,那我们如何来解决它呢?优秀的解决方案往往是 “通过架构来解决问题“,那么能不能通过架构的思想来解决问题呢?
答案是可以的。我们可以将程序伪装成 “从数据库”,主库的增量变化会传递到从库,那这个伪装成 “从数据库” 的程序就能实时获取到数据变化,然后将增量的变化推送到消息队列 MQ,后续消费者消耗 MQ 的数据,然后经过处理之后再推送到各自需要的数据库。
这个架构的核心是通过监听 MySQL 的 binlog 来同步增量数据,通过基于 query 的查询旧表来同步旧数据,这就是本文要讲的一种异构数据库同步的实践。
经过深度的调研,成功得到了一套异构数据库同步方案,并且成功将公司生产环境下的 robin/logs 的表同步到了 elasticsearch 上。
首先对 MySQL 开启 binlog,但是由于 maxwell 需要的 binlog_format=row 原本的生产环境的数据库不宜修改。这里请教了海杨前辈,他提供了”从库联级“的思路,在从库中监听 binlog 绕过了操作生产环境重启主库的操作,大大降低了系统风险。
后续操作比较顺利,启动 maxwell 监听从库变化,然后将增量变化推送到 kafka ,最后配置 logstash 消费 kafka中的数据变化事件信息,将结果推送到 elasticsearch。配置 logstash需要结合表结构,这是整套方案实施的重点。
这套方案使用到了kafka、maxwell、logstash、elasticsearch。其中 elasticsearch 与 kafka已经在生产环境中有部署,所以无需单独部署维护。而 logstash 与 maxwell 只需要修改配置文件和启动命令即可快速上线。整套方案的意义不仅在于成本低,而且可以大规模使用,公司内有 MySQL 同步到其它数据库的需求时,都可以上任。
使用该方案同步和业务实现同步的对比
写入到 elasticsearch 性能对比 (8核4G内存)
经过对比测试,800w 数据量全量同步,使用 logstash 写到 elasticsearch,实际需要大概 3 小时,而旧方案的写入时间需要 2.5 天。
接下来,我们来看看具体是如何实现的。
本方案无需编写额外代码,非侵入式的,实现 MySQL 数据与 elasticsearch 数据库的同步。
下列是本次方案需要使用所有的组件:
MySQL
Kafka
Maxwell(监听 binlog)
Logstash(将数据同步给 elasticsearch)
Elasticsearch
本次使用 MySQL 5.5 作示范,其他版本的配置可能稍许不同需要
首先我们需要增加一个数据库只读的用户,如果已有的可以跳过。
1 2 3 4 |
-- 创建一个 用户名为 maxwell 密码为 xxxxxx 的用户 CREATE USER 'maxwell' @ '%' IDENTIFIED BY 'XXXXXX' ; GRANT ALL ON maxwell.* TO 'maxwell' @ 'localhost' ; GRANT SELECT , REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell' @ '%' ; |
开启数据库的 binlog
,修改 mysql
配置文件,注意 maxwell
需要的 binlog
格式必须是row
。
1 2 3 4 5 6 7 8 9 |
# /etc/mysql/my.cnf [mysqld] # maxwell 需要的 binlog 格式必须是 row binlog_format=row # 指定 server_id 此配置关系到主从同步需要按情况设置, # 由于此mysql没有开启主从同步,这边默认设置为 1 server_id=1 # logbin 输出的文件名, 按需配置 log-bin=master |
重启 MySQL 并查看配置是否生效:
1 | sudo systemctl restart mysqld |
1 2 3 4 |
select @@log_bin; -- 正确结果是 1 select @@binlog_format; -- 正确结果是 ROW |
如果要监听的数据库开启了主从同步,并且不是主数据库,需要再从数据库开启 binlog 联级同步。
1 2 |
# /etc/my.cnf log_slave_updates = 1 |
需要被同步到 elasticsearch 的表结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
-- robin.logs show create table robin.logs; -- 表结构 CREATE TABLE `logs` ( `id` int (11) unsigned NOT NULL AUTO_INCREMENT, `content` text NOT NULL , `user_id` int (11) NOT NULL , `status` enum( 'SUCCESS' , 'FAILED' , 'PROCESSING' ) NOT NULL , `type` varchar (20) DEFAULT '' , `meta` text, `created_at` bigint (15) NOT NULL , `idx_host` varchar (255) DEFAULT '' , `idx_domain_id` int (11) unsigned DEFAULT NULL , `idx_record_value` varchar (255) DEFAULT '' , `idx_record_opt` enum( 'DELETE' , 'ENABLED' , 'DISABLED' ) DEFAULT NULL , `idx_orig_record_value` varchar (255) DEFAULT '' , PRIMARY KEY (`id`), KEY `created_at` (`created_at`) ) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf8 |
本次使用 maxwell-1.39.2 作示范, 确保机器中包含 java 环境, 推荐 openjdk11
下载 maxwell 程序
1 2 |
wget https: //github .com /zendesk/maxwell/releases/download/v1 .39.2 /maxwell-1 .39.2. tar .gz tar zxvf maxwell-1.39.2. tar .gz **&&** cd maxwell-1.39.2 |
maxwell 使用了两个数据库:
一个是需要被监听binlog的数据库(只需要读权限)
另一个是记录maxwell服务状态的数据库,当前这两个数据库可以是同一个
重要参数说明:
host 需要监听binlog的数据库地址
port 需要监听binlog的数据库端口
user 需要监听binlog的数据库用户名
password 需要监听binlog的密码
replication_host 记录maxwell服务的数据库地址
replication_port 记录maxwell服务的数据库端口
replication_user 记录maxwell服务的数据库用户名
filter 用于监听binlog数据时过滤不需要的数据库数据或指定需要的数据库
producer 将监听到的增量变化数据提交给的消费者 (如 stdout、kafka)
kafka.bootstrap.servers kafka 服务地址
kafka_version kafka 版本
kafka_topic 推送到kafka的主题
启动 maxwell
注意,如果 kafka 配置了禁止自动创建主题,需要先自行在 kafka 上创建主题,kafka_version 需要根据情况指定, 此次使用了两张不同的库
1 2 3 4 5 6 7 8 9 10 11 12 13 |
. /bin/maxwell --host=mysql-maxwell.mysql.svc.cluster.fud3 --port=3306 --user=root --password=password --replication_host=192.168.5.38 --replication_port=3306 --replication_user=cloner --replication_password=password --filter= 'exclude: *.*, include: robin.logs' --producer=kafka --kafka.bootstrap.servers=192.168.30.10:9092 --kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.1 |
Logstash 包中已经包含了 openjdk,无需额外安装。
1 2 |
wget https: //artifacts .elastic.co /downloads/logstash/logstash-8 .5.0-linux-x86_64. tar .gz tar zxvf logstash-8.5.0-linux-x86_64. tar .gz |
删除不需要的配置文件。
1 | rm config /logstash .yml |
修改 logstash 配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# config/logstash-sample.conf input { kafka { bootstrap_servers => "192.168.30.10:9092" group_id => "main" topics => [ "maxwell-robinlogs" ] } } filter { json { source => "message" } # 将maxwell的事件类型转化为es的事件类型 # 如增加 -> index 修改-> update translate { source => "[type]" target => "[action]" dictionary => { "insert" => "index" "bootstrap-insert" => "index" "update" => "update" "delete" => "delete" } fallback => "unknown" } # 过滤无效的数据 if ([action] == "unknown" ) { drop {} } # 处理数据格式 if [data][idx_host] { mutate { add_field => { "idx_host" => "%{[data][idx_host]}" } } } else { mutate { add_field => { "idx_host" => "" } } } if [data][idx_domain_id] { mutate { add_field => { "idx_domain_id" => "%{[data][idx_domain_id]}" } } } else { mutate { add_field => { "idx_domain_id" => "" } } } if [data][idx_record_value] { mutate { add_field => { "idx_record_value" => "%{[data][idx_record_value]}" } } } else { mutate { add_field => { "idx_record_value" => "" } } } if [data][idx_record_opt] { mutate { add_field => { "idx_record_opt" => "%{[data][idx_record_opt]}" } } } else { mutate { add_field => { "idx_record_opt" => "" } } } if [data][idx_orig_record_value] { mutate { add_field => { "idx_orig_record_value" => "%{[data][idx_orig_record_value]}" } } } else { mutate { add_field => { "idx_orig_record_value" => "" } } } if [data][type] { mutate { replace => { "type" => "%{[data][type]}" } } } else { mutate { replace => { "type" => "" } } } mutate { add_field => { "id" => "%{[data][id]}" "content" => "%{[data][content]}" "user_id" => "%{[data][user_id]}" "status" => "%{[data][status]}" "meta" => "%{[data][meta]}" "created_at" => "%{[data][created_at]}" } remove_field => [ "data" ] } mutate { convert => { "id" => "integer" "user_id" => "integer" "idx_domain_id" => "integer" "created_at" => "integer" } } # 只提炼需要的字段 mutate { remove_field => [ "message" , "original" , "@version" , "@timestamp" , "event" , "database" , "table" , "ts" , "xid" , "commit" , "tags" ] } } output { # 结果写到es elasticsearch { hosts => [ "http://es-zico2.service.upyun:9500" ] index => "robin_logs" action => "%{action}" document_id => "%{id}" document_type => "robin_logs" } # 结果打印到标准输出 stdout { codec => rubydebug } } |
执行程序:
1 2 3 4 |
# 测试配置文件* bin /logstash -f config /logstash-sample .conf --config.test_and_exit # 启动* bin /logstash -f config /logstash-sample .conf --config.reload.automatic |
完成启动后,后续的增量数据 maxwell 会自动推送给 logstash 最终推送到 elasticsearch ,而之前的旧数据可以通过 maxwell 的 bootstrap 来同步,往下面表中插入一条任务,那么 maxwell 会自动将所有符合条件的 where_clause 的数据推送更新。
1 2 3 4 |
INSERT INTO maxwell.bootstrap ( database_name, table_name, where_clause, client_id ) values ( 'robin' , 'logs' , 'id > 1' , 'maxwell' ); |
后续可以在 elasticsearch 检测数据是否同步完成,可以先查看数量是否一致,然后抽样对比详细数据。
1 2 |
# 检测 elasticsearch 中的数据量 GET robin_logs /robin_logs/_count |
以上就是实现MySQL与elasticsearch的数据同步的代码示例的详细内容
2023-10-30
windows上的mysql服务突然消失提示10061 Unkonwn error问题及解决方案2023-10-30
MySQL非常重要的日志bin log详解2023-10-30
详解MySQL事务日志redo log一、单表查询 1、排序 2、聚合函数 3、分组 4、limit 二、SQL约束 1、主键约束 2、非空约束 3、唯一约束 4、外键约束 5、默认值 三、多表查询 1、内连接 1)隐式内连接: 2)显式内连接: 2、外连接 1)左外连接 2)右外连接 四...
2023-10-30
Mysql删除表重复数据 表里存在唯一主键 没有主键时删除重复数据 Mysql删除表中重复数据并保留一条 准备一张表 用的是mysql8 大家自行更改 创建表并添加四条相同的数据...
2023-10-30