查看kafka消費組的消息數(shù) 如何使用消息隊列解決分布式事務?
如何使用消息隊列解決分布式事務?消息事務是指一系列的生產(chǎn)、消費操作可以要么都完成,要么都失敗,類似數(shù)據(jù)庫的事務。在此用Kafka 為例做進一步解說吧!一、基本概念為了支持事務,Kafka 0.11.0
如何使用消息隊列解決分布式事務?
消息事務是指一系列的生產(chǎn)、消費操作可以要么都完成,要么都失敗,類似數(shù)據(jù)庫的事務。在此用Kafka 為例做進一步解說吧!
一、基本概念
為了支持事務,Kafka 0.11.0版本引入以下概念:
1.事務協(xié)調(diào)者:類似于消費組負載均衡的協(xié)調(diào)者,每一個實現(xiàn)事務的生產(chǎn)端都被分配到一個事務協(xié)調(diào)者(Transaction Coordinator)。
2.引入一個內(nèi)部Kafka Topic作為事務Log:類似于消費管理Offset的Topic,事務Topic本身也是持久化的,日志信息記錄事務狀態(tài)信息,由事務協(xié)調(diào)者寫入。
3.引入控制消息(Control Messages):這些消息是客戶端產(chǎn)生的并寫入到主題的特殊消息,但對于使用者來說不可見。它們是用來讓broker告知消費者之前拉取的消息是否被原子性提交。
4.引入TransactionId:不同生產(chǎn)實例使用同一個TransactionId表示是同一個事務,可以跨Session的數(shù)據(jù)冪等發(fā)送。當具有相同Transaction ID的新的Producer實例被創(chuàng)建且工作時,舊的且擁有相同Transaction ID的Producer將不再工作,避免事務僵死。
ID:每個新的Producer在初始化的時候會被分配一個唯一的PID,這個PID對用戶是不可見的。主要是為提供冪等性時引入的。
Numbler。(對于每個PID,該Producer發(fā)送數(shù)據(jù)的每個ltTopic, Partitiongt都對應一個從0開始單調(diào)遞增的Sequence Number。
7.每個生產(chǎn)者增加一個epoch:用于標識同一個事務Id在一次事務中的epoch,每次初始化事務時會遞增,從而讓服務端可以知道生產(chǎn)者請求是否舊的請求。
8.冪等性:保證發(fā)送單個分區(qū)的消息只會發(fā)送一次,不會出現(xiàn)重復消息。增加一個冪等性的開關(guān),可以獨立與事務使用,即可以只開啟冪等但不開啟事務
二、事務流程
1、查找事務協(xié)調(diào)者
生產(chǎn)者會首先發(fā)起一個查找事務協(xié)調(diào)者的請求(FindCoordinatorRequest)。協(xié)調(diào)者會負責分配一個PID給生產(chǎn)者。類似于消費組的協(xié)調(diào)者。
2、獲取produce ID
在知道事務協(xié)調(diào)者后,生產(chǎn)者需要往協(xié)調(diào)者發(fā)送初始化pid請求(initPidRequest)。這個請求分兩種情況:
●不帶transactionID
這種情況下直接生成一個新的produce ID即可,返回給客戶端
●帶transactionID
這種情況下,kafka根據(jù)transactionalId獲取對應的PID,這個對應關(guān)系是保存在事務日志中(上圖2a)。這樣可以確保相同的TransactionId返回相同的PID,用于恢復或者終止之前未完成的事務。
3、啟動事務
生產(chǎn)者通過調(diào)用beginTransaction接口啟動事務,此時只是內(nèi)部的狀態(tài)記錄為事務開始,但是事務協(xié)調(diào)者認為事務開始只有當生產(chǎn)者開始發(fā)送第一條消息才開始。
4、消費和生產(chǎn)配合過程
這一步是消費和生成互相配合完成事務的過程,其中涉及多個請求:
●增加分區(qū)到事務請求
當生產(chǎn)者有新分區(qū)要寫入數(shù)據(jù),則會發(fā)送AddPartitionToTxnRequest到事務協(xié)調(diào)者。協(xié)調(diào)者會處理請求,主要做的事情是更新事務元數(shù)據(jù)信息,并把信息寫入到事務日志中(事務Topic)。
●生產(chǎn)請求
生產(chǎn)者通過調(diào)用send接口發(fā)送數(shù)據(jù)到分區(qū),這些請求新增pid,epoch和sequence number字段。
●增加消費offset到事務
生產(chǎn)者通過新增的snedOffsets ToTransaction接口,會發(fā)送某個分區(qū)的Offset信息到事務協(xié)調(diào)者。協(xié)調(diào)者會把分區(qū)信息增加到事務中。
●事務提交offset請求
當生產(chǎn)者調(diào)用事務提交offset接口后,會發(fā)送一個TxnOffsetCommitRequest請求到消費組協(xié)調(diào)者,消費組協(xié)調(diào)者會把offset存儲在__consumer-offsets Topic中。協(xié)調(diào)者會根據(jù)請求的PID和epoch驗證生產(chǎn)者是否允許發(fā)起這個請求。 消費offset只有當事務提交后才對外可見。
5、提交或回滾事務
用戶通過調(diào)用commitTransaction或abortTranssaction方法提交或回滾事務。
●EndTxnRequest
當生產(chǎn)者完成事務后,客戶端需要顯式調(diào)用結(jié)束事務或者回滾事務。前者會使得消息對消費者可見,后者會對生產(chǎn)數(shù)據(jù)標記為Abort狀態(tài),使得消息對消費者不可見。無論是提交或者回滾,都是發(fā)送一個EndTnxRequest請求到事務協(xié)調(diào)者,寫入PREPARE_COMMIT或者PREPARE_ABORT信息到事務記錄日志中(5.1a)。
●WriteTxnMarkerRequest
這個請求是事務協(xié)調(diào)者向事務中每個TopicPartition的Leader發(fā)送的。每個Broker收到請求后會寫入COMMIT(PID)或者ABORT(PID)控制信息到數(shù)據(jù)日志中(5.2a)。
這個信息用于告知消費者當前消息是哪個事務,消息是否應該接受或者丟棄。而對于未提交消息,消費者會緩存該事務的消息直到提交或者回滾。
這里要注意,如果事務也涉及到__consumer_offsets,即該事務中有消費數(shù)據(jù)的操作且將該消費的Offset存于__consumer_offsets中,Transaction Coordinator也需要向該內(nèi)部Topic的各Partition的Leader發(fā)送WriteTxnMarkerRequest從而寫入COMMIT(PID)或COMMIT(PID)控制信息(5.2a 左邊)。
●寫入最終提交或回滾信息
當提交和回滾信息寫入數(shù)據(jù)日子后,事務協(xié)調(diào)者會往事務日志中寫入最終的提交或者終止信息以表示事務已經(jīng)完成(圖5.3),此時大部分于事務有關(guān)系的消息都可以被刪除(通過標記后面在日志壓縮時會被移除),我們只需要保留事務ID以及其時間戳即可。
apache kafka是什么開源的系統(tǒng)?
Apache Kafka是一個開源消息系統(tǒng),由Scala寫成。是由Apache軟件基金會開發(fā)的一個開源消息系統(tǒng)項目。
Kafka最初是由LinkedIn開發(fā),并于2011年初開源。2012年10月從Apache Incubator畢業(yè)。該項目的目標是為處理實時數(shù)據(jù)提供一個統(tǒng)一、高通量、低等待的平臺。
Kafka是一個分布式消息隊列:生產(chǎn)者、消費者的功能。它提供了類似于JMS的特性,但是在設(shè)計實現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實現(xiàn)。
Kafka對消息保存時根據(jù)Topic進行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。
無論是kafka集群,還是producer和consumer都依賴于zookeeper集群保存一些meta信息,來保證系統(tǒng)可用性