緩存一致性 每次逢年過節(jié)的時候搶票非常艱難,放票的時候那么多人同時去搶票,如果所有人查詢、購票等都去訪問數(shù)據(jù)庫,那數(shù)據(jù)庫的壓力得有多大,這時候很多都會引入緩存, 把車票信息放入緩存,這樣可以減少數(shù)據(jù)庫壓力。當(dāng)乘客購買成功之后,數(shù)據(jù)庫發(fā)生了變化,需要及時更新緩存中的數(shù)據(jù),以便于其他乘客能從緩存中及時獲取最新車票信息。這就是緩存一致性。
解決數(shù)據(jù)庫與緩存一致性主要思路:
1、同步雙寫:也就是修改db的時候同時修改一下緩存,這種模式下會出現(xiàn)無法保證數(shù)據(jù)庫與緩存的原子性。
如果出現(xiàn)多線程同時修改db的情況,網(wǎng)絡(luò)延遲導(dǎo)致數(shù)據(jù)庫修改順序與請求順序錯位
例如:A 先操作數(shù)據(jù)庫修改 x=1
B也修改數(shù)據(jù)庫 x=2
但是網(wǎng)絡(luò)延遲導(dǎo)致
B先修改緩存 x=2
A再修改緩存 x=1
這樣就導(dǎo)致了數(shù)據(jù)庫中x=2,而緩存中則是 x=1,導(dǎo)致數(shù)據(jù)庫與緩存不一致。
2、設(shè)置有效期:給緩存設(shè)置有效期,到期后自動刪除。再次查詢時更新
優(yōu)勢:簡單、方便
缺點:時效性差,緩存過期之前可能不一致
場景:更新頻率較低,時效性要求低的業(yè)務(wù)
那我們有沒有什么更加好的解決方案呢?
阿里云的canal就為我們很好的解決了這一問題:
canal: 是Alibaba旗下的一款開源項目,純Java開發(fā).它是基于數(shù)據(jù)庫增量日志解析,提供 增量數(shù)據(jù)訂閱&消費 ,目前主要支持mysql。
canal工作原理
mysql的主從復(fù)制原理:
MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log , 其中記錄叫做二進(jìn)制日志事件 binary log events ,可以通過 show binlog events 進(jìn)行查看)
MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志( relay log )
MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)
canal工作原理
canal模擬mysql salve的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議;
mysql master收到dump請求,開始推送binary log給slave(也就是canal);
canal解析binary log對象(原始byte流).
canal的安裝配置(以windows為例)
一、登進(jìn)Mysql后,使用show variables like'log_bin';查詢是否開啟binlog,如果開啟(ON),進(jìn)行下一步,如果沒開啟(OFF),在數(shù)據(jù)庫的my.ini配置文件添加配置
[mysqld]
# 開啟 binlog
log-bin=mysql-bin
# 選擇 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)
server_id=1
二、binlog開啟后,創(chuàng)建一個canal用戶并授權(quán),官網(wǎng)配置是@%,表示所有服務(wù)器,所以改為localhost就可以,在mysql中,運行如下代碼,設(shè)置完成之后重啟:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' identified by 'canal';
FLUSH PRIVILEGES;
三、安裝canal
- 下載地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.6-alpha-1
在conf文件夾里找到confcanal.properties
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
- 說明:這個文件是 canal 的基本通用配置,canal 端口號默認(rèn)就是 11111,修改 canal 的輸出 model,默認(rèn) tcp,改為輸出到 kafka
- 重點關(guān)注上面的:canal.serverMode = tcp 這個配置,默認(rèn)情況,如果是使用mysql,可以不做修改,如果需要將數(shù)據(jù)同步到kafka,或者rocketmq,可以分別修改即可,此處暫不做修改
- 解壓到適當(dāng)位置,解壓后在conf文件夾里找到exampleinstance.properties,
canal.instance.mysql.slaveId=20 #只要和mysql的master的不一樣即可
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
- canal.instance.mysql.slaveId=20 #只要和mysql的master的不一樣即可
- canal.instance.master.address=127.0.0.1:3306 ,監(jiān)聽的mysql的master節(jié)點信息
- 配置連接 MySQL 的用戶名和密碼,默認(rèn)就是我們前面授權(quán)的 canal
- 修改數(shù)據(jù)庫配置信息,canal.instance.dbUsername、canal.instance.dbPassword為數(shù)據(jù)庫賬戶密碼,均為canal,剛剛創(chuàng)建賬號密碼,
- 到bin目錄下啟動 startup.bat,出現(xiàn)如下界面表示啟動成功
四、spring boot中整合canal maven依賴
< dependency >
< groupId >com.alibaba.otter< /groupId >
< artifactId >canal.client< /artifactId >
< version >1.1.4< /version >
< /dependency >
java 示例:
public class CanalService {
public static void main(String[] args) throws Exception{
//1.獲取 canal 連接對象,我在本機上部署的,所以是127.0.0.1
CanalConnector canalConnector =
CanalConnectors.newSingleConnector(new
InetSocketAddress("127.0.0.1", 11111), "example", "", "");
System.out.println("canal啟動并開始監(jiān)聽數(shù)據(jù) ...... ");
while (true){
canalConnector.connect();
//訂閱表 test數(shù)據(jù)庫下的所有表
canalConnector.subscribe("test.*");
//獲取數(shù)據(jù)
Message message = canalConnector.get(100);
//解析message
List< CanalEntry.Entry > entries = message.getEntries();
if(entries.size() <=0){
System.out.println("未檢測到數(shù)據(jù)");
Thread.sleep(1000);
}
for(CanalEntry.Entry entry : entries){
//1、獲取表名
String tableName = entry.getHeader().getTableName();
//2、獲取類型
CanalEntry.EntryType entryType = entry.getEntryType();
//3、獲取序列化后的數(shù)據(jù)
ByteString storeValue = entry.getStoreValue();
//判斷是否rowdata類型數(shù)據(jù)
if(CanalEntry.EntryType.ROWDATA.equals(entryType)){
//對第三步中的數(shù)據(jù)進(jìn)行解析
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//獲取當(dāng)前事件的操作類型
CanalEntry.EventType eventType = rowChange.getEventType();
//獲取數(shù)據(jù)集
List< CanalEntry.RowData > rowDatasList = rowChange.getRowDatasList();
//便利數(shù)據(jù)
for(CanalEntry.RowData rowData : rowDatasList){
//數(shù)據(jù)變更之前的內(nèi)容
JSONObject beforeData = new JSONObject();
List< CanalEntry.Column > beforeColumnsList = rowData.getAfterColumnsList();
for(CanalEntry.Column column : beforeColumnsList){
beforeData.put(column.getName(),column.getValue());
}
//數(shù)據(jù)變更之后的內(nèi)容
List< CanalEntry.Column > afterColumnsList = rowData.getAfterColumnsList();
JSONObject afterData = new JSONObject();
for(CanalEntry.Column column : afterColumnsList){
afterData.put(column.getName(),column.getValue());
}
System.out.println("Table :" + tableName +
",eventType :" + eventType +
",beforeData :" + beforeData +
",afterData : " + afterData);
//操作緩存
}
}else {
System.out.println("當(dāng)前操作類型為:" + entryType);
}
}
}
}
}
我手動在book表中操作數(shù)據(jù),可以看到程序監(jiān)控輸出結(jié)果
五、 最后我們拿到數(shù)據(jù)之后可以放入消息隊列,這樣可以加入重試機制,還可以防止冪等問題,最后再寫入緩存。
- 消息隊列保證可靠性:寫到隊列中的消息,成功消費之前不會丟失(重啟項目也不擔(dān)心)。
- 消息隊列保證消息成功投遞:下游從隊列拉取消息,成功消費后才會刪除消息,否則還會繼續(xù)投遞消息給消費者(符合我們重試的場景)。
-
緩存
+關(guān)注
關(guān)注
1文章
240瀏覽量
26679 -
數(shù)據(jù)庫
+關(guān)注
關(guān)注
7文章
3799瀏覽量
64395 -
開源
+關(guān)注
關(guān)注
3文章
3349瀏覽量
42500 -
dump
+關(guān)注
關(guān)注
0文章
13瀏覽量
9514
發(fā)布評論請先 登錄
相關(guān)推薦
評論