日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術(shù)文章
文章詳情頁

Spring Boot 集成 Kafkad的實現(xiàn)示例

瀏覽:26日期:2023-07-18 09:13:13

Spring Boot 作為主流微服務(wù)框架,擁有成熟的社區(qū)生態(tài)。市場應(yīng)用廣泛,為了方便大家,整理了一個基于spring boot的常用中間件快速集成入門系列手冊,涉及RPC、緩存、消息隊列、分庫分表、注冊中心、分布式配置等常用開源組件,大概有幾十篇文章,陸續(xù)會開放出來,感興趣同學(xué)請?zhí)崆瓣P(guān)注&收藏

消息通信有兩種基本模型,即發(fā)布-訂閱(Pub-Sub)模型和點對點(Point to Point)模型,發(fā)布-訂閱支持生產(chǎn)者消費者之間的一對多關(guān)系,而點對點模型中有且僅有一個消費者。

前言

Kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數(shù)據(jù)提供一個統(tǒng)一、高吞吐、低延遲的平臺。其持久化層本質(zhì)上是一個“按照分布式事務(wù)日志架構(gòu)的大規(guī)模發(fā)布/訂閱消息隊列”。

Kafka高效地處理實時流式數(shù)據(jù),可以實現(xiàn)與Storm、HBase和Spark的集成。作為聚類部署到多臺服務(wù)器上,Kafka處理它所有的發(fā)布和訂閱消息系統(tǒng)使用了四個API,即生產(chǎn)者API、消費者API、Stream API和Connector API。它能夠傳遞大規(guī)模流式消息,自帶容錯功能,已經(jīng)取代了一些傳統(tǒng)消息系統(tǒng),如JMS、AMQP等。

為什么使用kafka? 削峰填谷。緩沖上下游瞬時突發(fā)流量,保護 “脆弱” 的下游系統(tǒng)不被壓垮,避免引發(fā)全鏈路服務(wù) “雪崩”。 系統(tǒng)解耦。發(fā)送方和接收方的松耦合,一定程度簡化了開發(fā)成本,減少了系統(tǒng)間不必要的直接依賴。 異步通信:消息隊列允許用戶把消息放入隊列但不立即處理它。 可恢復(fù)性:即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。業(yè)務(wù)場景 一些同步業(yè)務(wù)流程的非核心邏輯,對時間要求不是特別高,可以解耦異步來執(zhí)行 系統(tǒng)日志收集,采集并同步到kafka,一般采用ELK組合玩法 一些大數(shù)據(jù)平臺,用于各個系統(tǒng)間數(shù)據(jù)傳遞 基本架構(gòu)

Kafka 運行在一個由一臺或多臺服務(wù)器組成的集群上,并且分區(qū)可以跨集群節(jié)點分布

Spring Boot 集成 Kafkad的實現(xiàn)示例

1、Producer 生產(chǎn)消息,發(fā)送到Broker中

2、Leader狀態(tài)的Broker接收消息,寫入到相應(yīng)topic中。在一個分區(qū)內(nèi),這些消息被索引并連同時間戳存儲在一起

3、Leader狀態(tài)的Broker接收完畢以后,傳給Follow狀態(tài)的Broker作為副本備份

4、 Consumer 消費者的進程可以從分區(qū)訂閱,并消費消息

常用術(shù)語 Broker。負責(zé)接收和處理客戶端發(fā)送過來的請求,以及對消息進行持久化。雖然多個 Broker 進程能夠運行在同一臺機器上,但更常見的做法是將不同的 Broker 分散運行在不同的機器上 主題:Topic。主題是承載消息的邏輯容器,在實際使用中多用來區(qū)分具體的業(yè)務(wù)。 分區(qū):Partition。一個有序不變的消息序列。每個主題下可以有多個分區(qū)。 消息:這里的消息就是指 Kafka 處理的主要對象。 消息位移:Offset。表示分區(qū)中每條消息的位置信息,是一個單調(diào)遞增且不變的值。 副本:Replica。Kafka 中同一條消息能夠被拷貝到多個地方以提供數(shù)據(jù)冗余,這些地方就是所謂的副本。副本還分為領(lǐng)導(dǎo)者副本和追隨者副本,各自有不同的角色劃分。每個分區(qū)可配置多個副本實現(xiàn)高可用。一個分區(qū)的N個副本一定在N個不同的Broker上。 Leader:每個分區(qū)多個副本的“主”副本,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費者消費數(shù)據(jù)的對象,都是 Leader。 Follower:每個分區(qū)多個副本的“從”副本,實時從 Leader 中同步數(shù)據(jù),保持和 Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時,某個 Follower 還會成為新的 Leader。 生產(chǎn)者:Producer。向主題發(fā)布新消息的應(yīng)用程序。 消費者:Consumer。從主題訂閱新消息的應(yīng)用程序。 消費者位移:Consumer Offset。表示消費者消費進度,每個消費者都有自己的消費者位移。offset保存在broker端的內(nèi)部topic中,不是在clients中保存 消費者組:Consumer Group。多個消費者實例共同組成的一個組,同時消費多個分區(qū)以實現(xiàn)高吞吐。 重平衡:Rebalance。消費者組內(nèi)某個消費者實例掛掉后,其他消費者實例自動重新分配訂閱主題分區(qū)的過程。Rebalance 是 Kafka 消費者端實現(xiàn)高可用的重要手段。代碼演示外部依賴:

在 pom.xml 中添加 Kafka 依賴:

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>

由于spring-boot-starter-parent 指定的版本號是2.1.5.RELEASE,spring boot 會對外部框架的版本號統(tǒng)一管理,spring-kafka 引入的版本是 2.2.6.RELEASE

配置文件:

在配置文件 application.yaml 中配置 Kafka 的相關(guān)參數(shù),具體內(nèi)容如下:

Spring: kafka: bootstrap-servers: localhost:9092 producer: retries: 3 # 生產(chǎn)者發(fā)送失敗時,重試次數(shù) batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生產(chǎn)者消息key和消息value的序列化處理類 value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: tomge-consumer-group # 默認消費者group id auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 100 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

對應(yīng)的配置類 org.springframework.boot.autoconfigure.kafka.KafkaProperties,來初始化kafka相關(guān)的bean實例對象,并注冊到spring容器中。

發(fā)送消息:

Spring Boot 作為一款支持快速開發(fā)的集成性框架,同樣提供了一批以 -Template 命名的模板工具類用于實現(xiàn)消息通信。對于 Kafka 而言,這個工具類就是KafkaTemplate。

KafkaTemplate 提供了一系列 send 方法用來發(fā)送消息,典型的 send 方法定義如下代碼所示:

public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { 。。。。 省略}

生產(chǎn)端提供了一個restful接口,模擬發(fā)送一條創(chuàng)建新用戶消息。

@GetMapping('/add_user')public Object add() { try {Long id = Long.valueOf(new Random().nextInt(1000));User user = User.builder().id(id).userName('TomGE').age(29).address('上海').build();ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send(addUserTopic, JSON.toJSONString(user));// 提供回調(diào)方法,可以監(jiān)控消息的成功或失敗的后續(xù)處理listenableFuture.addCallback(new ListenableFutureCallback<SendResult>() { @Override public void onFailure(Throwable throwable) {System.out.println('發(fā)送消息失敗,' + throwable.getMessage()); } @Override public void onSuccess(SendResult sendResult) {// 消息發(fā)送到的topicString topic = sendResult.getRecordMetadata().topic();// 消息發(fā)送到的分區(qū)int partition = sendResult.getRecordMetadata().partition();// 消息在分區(qū)內(nèi)的offsetlong offset = sendResult.getRecordMetadata().offset();System.out.println(String.format('發(fā)送消息成功,topc:%s, partition: %s, offset:%s ', topic, partition, offset)); }});return '消息發(fā)送成功'; } catch (Exception e) {e.printStackTrace();return '消息發(fā)送失敗'; }}

實際上開發(fā)使用的Kafka默認允許自動創(chuàng)建Topic,創(chuàng)建Topic時默認的分區(qū)數(shù)量是1,可以通過server.properties文件中的num.partitions=1修改默認分區(qū)數(shù)量。在生產(chǎn)環(huán)境中通常會關(guān)閉自動創(chuàng)建功能,Topic需要由運維人員先創(chuàng)建好。

消費消息:

在 Kafka 中消息通過服務(wù)器推送給各個消費者,而 Kafka 的消費者在消費消息時,需要提供一個監(jiān)聽器(Listener)對某個 Topic 實現(xiàn)監(jiān)聽,從而獲取消息,這也是 Kafka 消費消息的唯一方式。

定義一個消費類,在處理具體消息業(yè)務(wù)邏輯的方法上添加 @KafkaListener 注解,并配置要消費的topic,代碼如下所示:

@Componentpublic class UserConsumer { @KafkaListener(topics = 'add_user') public void receiveMesage(String content) {System.out.println('消費消息:' + content); }}

是不是很簡單,添加kafka依賴、使用KafkaTemplate、@KafkaListener注解就完成消息的生產(chǎn)和消費,其實是SpringBoot在背后默默的做了很多工作,如果感興趣可以研究下spring-boot-autoconfigure ,里面提供了常用開源框架的客戶端實例封裝。

演示工程代碼

https://github.com/aalansehaiyang/spring-boot-bulking

模塊:spring-boot-bulking-kafka

以上就是Spring Boot 集成 Kafkad的示例的詳細內(nèi)容,更多關(guān)于Spring Boot 集成 Kafka的資料請關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

標簽: Spring
相關(guān)文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
亚洲精品在线二区| 深夜日韩欧美| 国产精选久久| 亚洲久草在线| 亚洲综合婷婷| 综合激情在线| 亚洲91在线| 日韩1区2区日韩1区2区| 日本不卡视频在线观看| 日韩高清电影一区| 国产毛片精品| 国际精品欧美精品| 日韩免费久久| 欧美日中文字幕| 99国产精品视频免费观看一公开| 亚洲欧美日韩一区在线观看| 久热综合在线亚洲精品| 免费不卡在线观看| 亚洲精品一级| 国产视频网站一区二区三区| 国产精品美女在线观看直播| 欧美国产极品| 综合日韩av| 91成人精品| 亚洲精品国产精品粉嫩| 欧美日韩一区二区三区在线电影| 欧美激情麻豆| 日韩欧美1区| 国产一区二区高清| 日韩综合一区二区三区| 人人爱人人干婷婷丁香亚洲| 免费亚洲一区| 欧美性感美女一区二区| 欧美性感美女一区二区| 亚洲少妇自拍| 久久精品xxxxx| 国产精品黑丝在线播放| 精品1区2区3区4区| 日韩1区2区3区| 精品一区二区三区视频在线播放| 美女av在线免费看| 久久亚洲电影| 免费视频一区二区三区在线观看| 日韩毛片在线| 中文不卡在线| 激情综合五月| 久久av一区| 久久在线91| 99精品在线观看| 日韩福利视频一区| 日韩亚洲一区在线| 99在线精品免费视频九九视| 日本三级亚洲精品| 久久久久国产精品一区三寸| 中文字幕成人| 日韩精品诱惑一区?区三区| 丝袜美腿成人在线| 精品免费在线| 亚洲一区二区三区无吗| 国产一区二区三区四区| 美女日韩在线中文字幕| 国产中文欧美日韩在线| 美女精品一区| 久久精品国产精品亚洲毛片| 免费不卡中文字幕在线| 久久精品99久久久| 欧美日韩四区| 老司机免费视频一区二区三区| 激情91久久| 国产a亚洲精品| 日本一区二区三区视频在线看| 日韩网站中文字幕| 欧美日韩1区2区3区| 激情欧美一区| 欧美激情亚洲| 免费人成精品欧美精品| 国产精品原创| 91精品国产自产在线丝袜啪| 国产综合色区在线观看| 青草综合视频| 99国产成+人+综合+亚洲欧美| 韩国女主播一区二区三区| 无码日韩精品一区二区免费| 久久久人人人| 精品在线网站观看| 日本亚洲三级在线| 五月天久久久| 中文字幕在线官网| 国产精品hd| 日韩一区免费| 黄色日韩在线| 久久三级福利| 精品一区二区三区免费看| 三级亚洲高清视频| 久久国产小视频| 国产精品精品| 美女久久久久久| 日韩av中文字幕一区二区| 亚洲少妇自拍| 伊人精品一区| 欧美久久天堂| 精品美女在线视频| 日本91福利区| 日韩欧美精品一区二区综合视频| 99视频精品| 99在线精品视频在线观看| 久久亚洲成人| 日韩精品看片| 精品国产aⅴ| 国产精品22p| 日本国产一区| 午夜天堂精品久久久久| 热久久久久久久| 蜜桃av一区| 久久亚洲风情| 免费在线观看成人| 日韩精品一级二级| 爽爽淫人综合网网站| 婷婷久久一区| 偷拍欧美精品| 99香蕉国产精品偷在线观看| 一级欧洲+日本+国产| 欧美午夜不卡影院在线观看完整版免费| 伊人久久在线| 丝袜美腿诱惑一区二区三区| 国产精品高颜值在线观看| 国产一区二区三区精品在线观看| 欧美交a欧美精品喷水| 久久99偷拍| 久久精品资源| 国产黄大片在线观看| 亚洲欧洲高清| 福利一区二区三区视频在线观看| 精品国产乱码久久久久久1区2匹| 精品久久在线| 欧美天堂视频| 久久精品国产99久久| 黄色国产精品| 午夜在线精品偷拍| 日韩精品一二三区| 日韩av中文字幕一区二区三区| 欧美伊人久久| 另类欧美日韩国产在线| 成人台湾亚洲精品一区二区| 日韩国产激情| 欧美久久精品一级c片| 夜久久久久久| 日韩免费精品| 美女av一区| 亚洲成人精品| 中文国产一区| 日韩激情一二三区| 欧美1区2区3| 97精品国产一区二区三区| 正在播放日韩精品| 欧美日韩国产传媒| 亚洲一区欧美| 国产精品一区二区三区四区在线观看| 美腿丝袜亚洲一区| 久久天堂av| 免费看欧美美女黄的网站| 91精品一区| 日韩中文欧美| 人人精品人人爱| 精品欠久久久中文字幕加勒比| 91精品亚洲| 涩涩涩久久久成人精品| 国内揄拍国内精品久久| 91精品在线观看国产| 在线一区二区三区视频| 国产九九精品| 久久久久.com| 日韩avvvv在线播放| av资源中文在线天堂| 国产视频久久| 国产精品巨作av| 99视频精品全部免费在线视频| 亚洲深深色噜噜狠狠爱网站 | 欧美日韩一区二区综合| 蜜臀久久久99精品久久久久久| 国产精品夜夜夜| 日韩大片在线| 日本一区二区三区视频在线看| 精品中文字幕一区二区三区四区| 欧美日韩国产高清电影| 欧美色综合网| 99成人在线视频| 日韩av一区二区三区四区| 播放一区二区| 欧美一区激情| 欧美日韩精品一区二区视频| 日本天堂一区| 免费av一区二区三区四区| 久久国产视频网| 午夜久久影院| 国产成人在线中文字幕| 亚洲成人av观看| 国产福利一区二区三区在线播放| 亚洲精品一二三区区别| 欧美国产不卡|