日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術(shù)文章
文章詳情頁

實現(xiàn)MySQL與elasticsearch的數(shù)據(jù)同步的代碼示例

瀏覽:37日期:2023-07-05 19:48:16
目錄原數(shù)據(jù)庫的同步問題解決思路及方案調(diào)整架構(gòu)改進數(shù)據(jù)庫成果展示前后對比方案實施細節(jié)1. MySQL配置2. Maxwell 配置3. 安裝 Logstash4. 全量同步原數(shù)據(jù)庫的同步問題

由于傳統(tǒng)的 mysql 數(shù)據(jù)庫并不擅長海量數(shù)據(jù)的檢索,當數(shù)據(jù)量到達一定規(guī)模時(估算單表兩千萬左右),查詢和插入的耗時會明顯增加。同樣,當需要對這些數(shù)據(jù)進行模糊查詢或是數(shù)據(jù)分析時,MySQL作為事務型關(guān)系數(shù)據(jù)庫很難提供良好的性能支持。使用適合的數(shù)據(jù)庫來實現(xiàn)模糊查詢是解決這個問題的關(guān)鍵。

但是,切換數(shù)據(jù)庫會迎來兩個問題,一是已有的服務對現(xiàn)在的 MySQL 重度依賴,二是 MySQL 的事務能力和軟件生態(tài)仍然不可替代,直接遷移數(shù)據(jù)庫的成本過大。我們綜合考慮了下,決定同時使用多個數(shù)據(jù)庫的方案,不同的數(shù)據(jù)庫應用于不同的使用場景。而在支持模糊查詢功能的數(shù)據(jù)庫中,elasticsearch 自然是首選的查詢數(shù)據(jù)庫。這樣后續(xù)對業(yè)務需求的切換也會非常靈活。

那具體該如何實現(xiàn)呢?在又拍云以往的項目中,也有遇到相似的問題。之前采用的方法是在業(yè)務中編寫代碼,然后同步到 elasticsearch 中。具體是這樣實施的:每個系統(tǒng)編寫特定的代碼,修改 MySQL 數(shù)據(jù)庫后,再將更新的數(shù)據(jù)直接推送到需要同步的數(shù)據(jù)庫中,或推送到隊列由消費程序來寫入到數(shù)據(jù)庫中。

但這個方案有一些明顯的缺點:

系統(tǒng)高耦合,侵入式代碼,使得業(yè)務邏輯復雜度增加

方案不通用,每一套同步都需要額外定制,不僅增加業(yè)務處理時間,還會提升軟件復復雜度

工作量和復雜度增加

在業(yè)務中編寫同步方案,雖然在項目早期比較方便,但隨著數(shù)據(jù)量和系統(tǒng)的發(fā)展壯大,往往最后會成為業(yè)務的大痛點。

解決思路及方案調(diào)整架構(gòu)

既然以往的方案有明顯的缺點,那我們?nèi)绾蝸斫鉀Q它呢?優(yōu)秀的解決方案往往是 “通過架構(gòu)來解決問題“,那么能不能通過架構(gòu)的思想來解決問題呢?

答案是可以的。我們可以將程序偽裝成 “從數(shù)據(jù)庫”,主庫的增量變化會傳遞到從庫,那這個偽裝成 “從數(shù)據(jù)庫” 的程序就能實時獲取到數(shù)據(jù)變化,然后將增量的變化推送到消息隊列 MQ,后續(xù)消費者消耗 MQ 的數(shù)據(jù),然后經(jīng)過處理之后再推送到各自需要的數(shù)據(jù)庫。

這個架構(gòu)的核心是通過監(jiān)聽 MySQL 的 binlog 來同步增量數(shù)據(jù),通過基于 query 的查詢舊表來同步舊數(shù)據(jù),這就是本文要講的一種異構(gòu)數(shù)據(jù)庫同步的實踐。

改進數(shù)據(jù)庫

經(jīng)過深度的調(diào)研,成功得到了一套異構(gòu)數(shù)據(jù)庫同步方案,并且成功將公司生產(chǎn)環(huán)境下的 robin/logs 的表同步到了 elasticsearch 上。

首先對 MySQL 開啟 binlog,但是由于 maxwell 需要的 binlog_format=row 原本的生產(chǎn)環(huán)境的數(shù)據(jù)庫不宜修改。這里請教了海楊前輩,他提供了”從庫聯(lián)級“的思路,在從庫中監(jiān)聽 binlog 繞過了操作生產(chǎn)環(huán)境重啟主庫的操作,大大降低了系統(tǒng)風險。

后續(xù)操作比較順利,啟動 maxwell 監(jiān)聽從庫變化,然后將增量變化推送到 kafka ,最后配置 logstash 消費 kafka中的數(shù)據(jù)變化事件信息,將結(jié)果推送到 elasticsearch。配置 logstash需要結(jié)合表結(jié)構(gòu),這是整套方案實施的重點。

這套方案使用到了kafka、maxwell、logstash、elasticsearch。其中 elasticsearch 與 kafka已經(jīng)在生產(chǎn)環(huán)境中有部署,所以無需單獨部署維護。而 logstash 與 maxwell 只需要修改配置文件和啟動命令即可快速上線。整套方案的意義不僅在于成本低,而且可以大規(guī)模使用,公司內(nèi)有 MySQL 同步到其它數(shù)據(jù)庫的需求時,都可以上任。

成果展示前后對比

使用該方案同步和業(yè)務實現(xiàn)同步的對比

寫入到 elasticsearch 性能對比 (8核4G內(nèi)存)

經(jīng)過對比測試,800w 數(shù)據(jù)量全量同步,使用 logstash 寫到 elasticsearch,實際需要大概 3 小時,而舊方案的寫入時間需要 2.5 天。

方案實施細節(jié)

接下來,我們來看看具體是如何實現(xiàn)的。

本方案無需編寫額外代碼,非侵入式的,實現(xiàn) MySQL 數(shù)據(jù)與 elasticsearch 數(shù)據(jù)庫的同步。

下列是本次方案需要使用所有的組件:

MySQL

Kafka

Maxwell(監(jiān)聽 binlog)

Logstash(將數(shù)據(jù)同步給 elasticsearch)

Elasticsearch

1. MySQL配置

本次使用 MySQL 5.5 作示范,其他版本的配置可能稍許不同需要

首先我們需要增加一個數(shù)據(jù)庫只讀的用戶,如果已有的可以跳過。

-- 創(chuàng)建一個 用戶名為 maxwell 密碼為 xxxxxx 的用戶CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

開啟數(shù)據(jù)庫的 binlog,修改 mysql 配置文件,注意 maxwell 需要的 binlog 格式必須是row。

# /etc/mysql/my.cnf[mysqld]# maxwell 需要的 binlog 格式必須是 rowbinlog_format=row# 指定 server_id 此配置關(guān)系到主從同步需要按情況設置,# 由于此mysql沒有開啟主從同步,這邊默認設置為 1server_id=1# logbin 輸出的文件名, 按需配置log-bin=master

重啟 MySQL 并查看配置是否生效:

sudo systemctl restart mysqldselect @@log_bin;-- 正確結(jié)果是 1select @@binlog_format;-- 正確結(jié)果是 ROW

如果要監(jiān)聽的數(shù)據(jù)庫開啟了主從同步,并且不是主數(shù)據(jù)庫,需要再從數(shù)據(jù)庫開啟 binlog 聯(lián)級同步。

# /etc/my.cnflog_slave_updates = 1

需要被同步到 elasticsearch 的表結(jié)構(gòu)。

-- robin.logsshow create table robin.logs;-- 表結(jié)構(gòu)CREATE TABLE `logs` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `content` text NOT NULL, `user_id` int(11) NOT NULL, `status` enum('SUCCESS','FAILED','PROCESSING') NOT NULL, `type` varchar(20) DEFAULT '', `meta` text, `created_at` bigint(15) NOT NULL, `idx_host` varchar(255) DEFAULT '', `idx_domain_id` int(11) unsigned DEFAULT NULL, `idx_record_value` varchar(255) DEFAULT '', `idx_record_opt` enum('DELETE','ENABLED','DISABLED') DEFAULT NULL, `idx_orig_record_value` varchar(255) DEFAULT '', PRIMARY KEY (`id`), KEY `created_at` (`created_at`)) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf82. Maxwell 配置

本次使用 maxwell-1.39.2 作示范, 確保機器中包含 java 環(huán)境, 推薦 openjdk11

下載 maxwell 程序

wget https://github.com/zendesk/maxwell/releases/download/v1.39.2/maxwell-1.39.2.tar.gztar zxvf maxwell-1.39.2.tar.gz **&&** cd maxwell-1.39.2

maxwell 使用了兩個數(shù)據(jù)庫:

一個是需要被監(jiān)聽binlog的數(shù)據(jù)庫(只需要讀權(quán)限)

另一個是記錄maxwell服務狀態(tài)的數(shù)據(jù)庫,當前這兩個數(shù)據(jù)庫可以是同一個

重要參數(shù)說明:

host 需要監(jiān)聽binlog的數(shù)據(jù)庫地址

port 需要監(jiān)聽binlog的數(shù)據(jù)庫端口

user 需要監(jiān)聽binlog的數(shù)據(jù)庫用戶名

password 需要監(jiān)聽binlog的密碼

replication_host 記錄maxwell服務的數(shù)據(jù)庫地址

replication_port 記錄maxwell服務的數(shù)據(jù)庫端口

replication_user 記錄maxwell服務的數(shù)據(jù)庫用戶名

filter 用于監(jiān)聽binlog數(shù)據(jù)時過濾不需要的數(shù)據(jù)庫數(shù)據(jù)或指定需要的數(shù)據(jù)庫

producer 將監(jiān)聽到的增量變化數(shù)據(jù)提交給的消費者 (如 stdout、kafka)

kafka.bootstrap.servers kafka 服務地址

kafka_version kafka 版本

kafka_topic 推送到kafka的主題

啟動 maxwell

注意,如果 kafka 配置了禁止自動創(chuàng)建主題,需要先自行在 kafka 上創(chuàng)建主題,kafka_version 需要根據(jù)情況指定, 此次使用了兩張不同的庫

./bin/maxwell --host=mysql-maxwell.mysql.svc.cluster.fud3 --port=3306 --user=root --password=password --replication_host=192.168.5.38 --replication_port=3306 --replication_user=cloner --replication_password=password--filter='exclude: *.*, include: robin.logs' --producer=kafka --kafka.bootstrap.servers=192.168.30.10:9092 --kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.13. 安裝 Logstash

Logstash 包中已經(jīng)包含了 openjdk,無需額外安裝。

wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gztar zxvf logstash-8.5.0-linux-x86_64.tar.gz

刪除不需要的配置文件。

rm config/logstash.yml

修改 logstash 配置文件

# config/logstash-sample.confinput { kafka { bootstrap_servers => '192.168.30.10:9092' group_id => 'main' topics => ['maxwell-robinlogs'] }}filter { json { source => 'message' } # 將maxwell的事件類型轉(zhuǎn)化為es的事件類型 # 如增加 -> index 修改-> update translate { source => '[type]' target => '[action]' dictionary => { 'insert' => 'index' 'bootstrap-insert' => 'index' 'update' => 'update' 'delete' => 'delete' } fallback => 'unknown' } # 過濾無效的數(shù)據(jù) if ([action] == 'unknown') { drop {} } # 處理數(shù)據(jù)格式 if [data][idx_host] { mutate { add_field => { 'idx_host' => '%{[data][idx_host]}' } } } else { mutate { add_field => { 'idx_host' => '' } } } if [data][idx_domain_id] { mutate { add_field => { 'idx_domain_id' => '%{[data][idx_domain_id]}' } } } else { mutate { add_field => { 'idx_domain_id' => '' } } } if [data][idx_record_value] { mutate { add_field => { 'idx_record_value' => '%{[data][idx_record_value]}' } } } else { mutate { add_field => { 'idx_record_value' => '' } } } if [data][idx_record_opt] { mutate { add_field => { 'idx_record_opt' => '%{[data][idx_record_opt]}' } } } else { mutate { add_field => { 'idx_record_opt' => '' } } } if [data][idx_orig_record_value] { mutate { add_field => { 'idx_orig_record_value' => '%{[data][idx_orig_record_value]}' } } } else { mutate { add_field => { 'idx_orig_record_value' => '' } } } if [data][type] { mutate { replace => { 'type' => '%{[data][type]}' } } } else { mutate { replace => { 'type' => '' } } } mutate { add_field => { 'id' => '%{[data][id]}' 'content' => '%{[data][content]}' 'user_id' => '%{[data][user_id]}' 'status' => '%{[data][status]}' 'meta' => '%{[data][meta]}' 'created_at' => '%{[data][created_at]}' } remove_field => ['data'] } mutate { convert => { 'id' => 'integer' 'user_id' => 'integer' 'idx_domain_id' => 'integer' 'created_at' => 'integer' } } # 只提煉需要的字段 mutate { remove_field => [ 'message', 'original', '@version', '@timestamp', 'event', 'database', 'table', 'ts', 'xid', 'commit', 'tags' ] }}output { # 結(jié)果寫到es elasticsearch { hosts => ['http://es-zico2.service.upyun:9500'] index => 'robin_logs' action => '%{action}' document_id => '%{id}' document_type => 'robin_logs' } # 結(jié)果打印到標準輸出 stdout { codec => rubydebug }}

執(zhí)行程序:

# 測試配置文件*bin/logstash -f config/logstash-sample.conf --config.test_and_exit# 啟動*bin/logstash -f config/logstash-sample.conf --config.reload.automatic4. 全量同步

完成啟動后,后續(xù)的增量數(shù)據(jù) maxwell 會自動推送給 logstash 最終推送到 elasticsearch ,而之前的舊數(shù)據(jù)可以通過 maxwell 的 bootstrap 來同步,往下面表中插入一條任務,那么 maxwell 會自動將所有符合條件的 where_clause 的數(shù)據(jù)推送更新。

INSERT INTO maxwell.bootstrap ( database_name, table_name, where_clause, client_id ) values ( 'robin', 'logs', 'id > 1', 'maxwell' );

后續(xù)可以在 elasticsearch 檢測數(shù)據(jù)是否同步完成,可以先查看數(shù)量是否一致,然后抽樣對比詳細數(shù)據(jù)。

# 檢測 elasticsearch 中的數(shù)據(jù)量GET robin_logs/robin_logs/_count

以上就是實現(xiàn)MySQL與elasticsearch的數(shù)據(jù)同步的代碼示例的詳細內(nèi)容,更多關(guān)于MySQ與elasticsearch數(shù)據(jù)同步的資料請關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

相關(guān)文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
亚洲精品九九| 亚洲福利专区| 免费在线观看成人| 午夜一区在线| 亚洲日本国产| 国产欧美日韩视频在线| 日韩不卡一区二区三区| 国产精品99久久免费| 韩日一区二区| 久久婷婷久久| 亚洲欧美不卡| 国产剧情在线观看一区| 日本一区二区高清不卡| 欧美日一区二区| 影院欧美亚洲| 日韩高清不卡一区| 精品国产乱码久久久| 999精品在线| 免费在线观看精品| 欧美激情久久久久久久久久久| 国产激情在线播放| 99亚洲精品| 欧美激情视频一区二区三区在线播放| 蜜桃av.网站在线观看| 巨乳诱惑日韩免费av| 精品国产鲁一鲁****| 日韩午夜高潮| 国产一区二区三区久久久久久久久| 日韩在线不卡| 亚洲精品日本| 日韩精品一区二区三区免费观影| 日本一区福利在线| 一本大道色婷婷在线| 欧美日韩xxxx| 性色一区二区| 久久精品91| 九九99久久精品在免费线bt| 99香蕉国产精品偷在线观看 | 中文字幕av亚洲精品一部二部 | 久久激情中文| 久久一区视频| 欧美另类中文字幕| 国产一级一区二区| 久久黄色影院| 国产精品毛片一区二区在线看| 婷婷精品在线| 亚洲综合不卡| 99精品在线观看| 91亚洲人成网污www| 久久麻豆视频| 欧美日韩一视频区二区| 久久国产精品毛片| 亚州av乱码久久精品蜜桃| 黄毛片在线观看| 国产盗摄——sm在线视频| 久久xxx视频| 国产精品亚洲综合久久| 青草久久视频| 国产精品久久久一区二区| 日韩不卡在线观看日韩不卡视频| 鲁大师成人一区二区三区| 99国产精品视频免费观看一公开| 婷婷综合亚洲| 免费日韩av片| 日本欧美在线| 久久影院资源站| 国产精品sm| 成人影视亚洲图片在线| 国产第一亚洲| 欧美综合另类| 视频一区日韩| 美女国产一区二区三区| 国产精品二区不卡| 免费精品国产的网站免费观看| 精品美女视频| 日韩精品一区二区三区中文| 久久国产成人| 一区二区91| 99日韩精品| 欧美在线看片| 日韩在线视频一区二区三区| 精品国产乱码久久久久久樱花| 久久国产麻豆精品| 精品日韩一区| www.九色在线| 免费视频久久| 国产精品一线| 久久中文字幕二区| 性欧美精品高清| 国产视频一区在线观看一区免费| 日韩av一区二| 一区二区三区视频免费观看 | 日韩一区二区三区高清在线观看| 麻豆久久一区| 爽爽淫人综合网网站| 日韩精品免费视频人成| 最新中文字幕在线播放| 日av在线不卡| 日韩精品1区| 国产精品久久久久久久久久齐齐| 久久精品亚洲人成影院 | 亚洲福利精品| 久久久久久亚洲精品美女| 99视频一区| 日韩国产在线| 鲁大师精品99久久久| 蜜臀国产一区二区三区在线播放| www.九色在线| 日韩高清在线不卡| 欧美网站在线| 999国产精品| 超级白嫩亚洲国产第一| 91午夜精品| 男女性色大片免费观看一区二区| 久久久久久久欧美精品| 神马午夜在线视频| 美女尤物国产一区| 亚洲毛片一区| 国产一区二区高清| 日韩精品免费一区二区在线观看| 日韩毛片一区| 精品久久久亚洲| 欧美中文一区| 免费人成网站在线观看欧美高清| 免费毛片在线不卡| 一本一道久久a久久| 欧美激情一区| 国产精品www994| 久久99视频| 久久香蕉网站| sm久久捆绑调教精品一区| 精品视频一区二区三区在线观看 | 亚州av一区| 四虎精品一区二区免费| 免费观看在线综合| 色狠狠一区二区三区| 国产日韩视频在线| 日韩在线黄色| 91成人在线精品视频| 麻豆精品少妇| 国产在线一区不卡| 蜜桃视频在线网站| 久久视频国产| 日韩影院精彩在线| 青青伊人久久| 精品理论电影在线| 99tv成人| 在线视频免费在线观看一区二区| av不卡免费看| 国产精品亚洲综合在线观看| 精品视频免费| 亚洲大片在线| 91精品美女| 精品成人18| 香蕉久久精品| 国产日韩欧美一区二区三区 | 国产精品久久久久久久久久妞妞| 精品高清久久| 最新亚洲激情| 欧美亚洲二区| 日韩欧美一区二区三区免费观看| 亚洲激情国产| 麻豆91在线播放| 久久国产精品毛片| 国产一区二区三区网| 日本aⅴ亚洲精品中文乱码 | 亚洲精品一区三区三区在线观看| 99国产精品久久久久久久| 久热re这里精品视频在线6| 欧美二区视频| 久久中文字幕导航| 国产精品99久久免费观看| 美女网站视频一区| 久久久久国产精品一区三寸| 蜜臀av一区二区在线免费观看| 亚洲aⅴ网站| 日韩在线麻豆| 一区在线免费观看| 欧美久久久网站| 欧美在线日韩| 欧美男人天堂| 美女精品网站| 免费日韩视频| 国产欧洲在线| 欧美日韩亚洲一区| 国产精品字幕| 国产一级久久| 正在播放日韩精品| 日韩国产高清在线| 久久影院一区二区三区| 视频二区不卡| 欧美成人精品午夜一区二区| 免播放器亚洲一区| 国产福利一区二区精品秒拍 | 免费一级欧美在线观看视频| 国产一区二区三区黄网站| 久久久水蜜桃av免费网站| 荡女精品导航| 国产精品66| 夜夜嗨网站十八久久|