国产成人毛片视频|星空传媒久草视频|欧美激情草久视频|久久久久女女|久操超碰在线播放|亚洲强奸一区二区|五月天丁香社区在线|色婷婷成人丁香网|午夜欧美6666|纯肉无码91视频

kafka刪除積壓數(shù)據(jù) 從kafka讀取數(shù)據(jù)后,數(shù)據(jù)會(huì)自動(dòng)刪除嗎?

從kafka讀取數(shù)據(jù)后,數(shù)據(jù)會(huì)自動(dòng)刪除嗎?基于receiver的實(shí)現(xiàn)將使用kakfa的高級消費(fèi)API。與所有其他接收器一樣,接收到的數(shù)據(jù)將保存到執(zhí)行器,然后sparkstreaming將啟動(dòng)作業(yè)來處理

從kafka讀取數(shù)據(jù)后,數(shù)據(jù)會(huì)自動(dòng)刪除嗎?

基于receiver的實(shí)現(xiàn)將使用kakfa的高級消費(fèi)API。與所有其他接收器一樣,接收到的數(shù)據(jù)將保存到執(zhí)行器,然后sparkstreaming將啟動(dòng)作業(yè)來處理數(shù)據(jù)。

在默認(rèn)配置中,如果出現(xiàn)故障,此方法將丟失數(shù)據(jù)。為了確保零數(shù)據(jù)丟失,我們需要啟用wal(writeaheadlogs)。它將接收到的數(shù)據(jù)同步保存到分布式文件系統(tǒng),如HDFS。因此,在發(fā)生錯(cuò)誤時(shí)可以恢復(fù)數(shù)據(jù)。

使用兩個(gè)步驟:1。添加依賴項(xiàng):Spark streaming Kafka 2.10-1.3.0

2導(dǎo)入器g.apache.spark. 卡夫卡.ux

卡夫卡有兩種刪除數(shù)據(jù)的方法

根據(jù)時(shí)間,刪除一段時(shí)間后過期的消息

根據(jù)消息大小,消息數(shù)超過一定大小后刪除最早的數(shù)據(jù)

卡夫卡刪除數(shù)據(jù)的最小單位:segment

卡夫卡刪除數(shù)據(jù)的主要邏輯:卡夫卡源代碼

def cleanuplogs(){debug(”beging log cleanup。。。)var總計(jì)=0 val開始時(shí)間=時(shí)間.毫秒For(log

Kafka在一段時(shí)間內(nèi)(配置文件設(shè)置)調(diào)用cleanuplogs一次,刪除所有需要?jiǎng)h除的日志數(shù)據(jù)。

Cleanupexpiredsegments負(fù)責(zé)清除超時(shí)數(shù)據(jù)

private def Cleanupexpiredsegments(log:log):int={val startms=時(shí)間.毫秒log.deleteOldSegments文件(開始時(shí)間->上次修改時(shí)間>log.config.retentions保留)}

cleanupsegmenttomaintainsize負(fù)責(zé)清理大于大小的數(shù)據(jù)私有def cleanupsegmentstomaintainsize(log:log):int={if(log.config.retentionSize文件=0){差異-=段.尺寸真}否則{假}log.deleteOldSegments文件(應(yīng)該刪除)}