日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

Java實現Kafka生產者和消費者的示例

瀏覽:207日期:2022-08-17 10:04:38
Kafka簡介

Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka的目標是為處理實時數據提供一個統一、高吞吐、低延遲的平臺。

Java實現Kafka生產者和消費者的示例

方式一:kafka-clients

引入依賴

在pom.xml文件中,引入kafka-clients依賴:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version></dependency>

生產者

創建一個KafkaProducer的生產者實例:

@Configurationpublic class Config { public final static String bootstrapServers = '127.0.0.1:9092'; @Bean(destroyMethod = 'close') public KafkaProducer<String, String> kafkaProducer() { Properties props = new Properties(); //設置Kafka服務器地址 props.put('bootstrap.servers', bootstrapServers); //設置數據key的序列化處理類 props.put('key.serializer', StringSerializer.class.getName()); //設置數據value的序列化處理類 props.put('value.serializer', StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props); return producer; }}

在Controller中進行使用:

@RestController@Slf4jpublic class Controller { @Autowired private KafkaProducer<String, String> kafkaProducer; @RequestMapping('/kafkaClientsSend') public String send() { String uuid = UUID.randomUUID().toString(); RecordMetadata recordMetadata = null; try { //將消息發送到Kafka服務器的名稱為“one-more-topic”的Topic中 recordMetadata = kafkaProducer.send(new ProducerRecord<>('one-more-topic', uuid)).get(); log.info('recordMetadata: {}', recordMetadata); log.info('uuid: {}', uuid); } catch (Exception e) { log.error('send fail, uuid: {}', uuid, e); } return uuid; }}

消費者

創建一個KafkaConsumer的消費者實例:

@Configurationpublic class Config { public final static String groupId = 'kafka-clients-group'; public final static String bootstrapServers = '127.0.0.1:9092'; @Bean(destroyMethod = 'close') public KafkaConsumer<String, String> kafkaConsumer() { Properties props = new Properties(); //設置Kafka服務器地址 props.put('bootstrap.servers', bootstrapServers); //設置消費組 props.put('group.id', groupId); //設置數據key的反序列化處理類 props.put('key.deserializer', StringDeserializer.class.getName()); //設置數據value的反序列化處理類 props.put('value.deserializer', StringDeserializer.class.getName()); props.put('enable.auto.commit', 'true'); props.put('auto.commit.interval.ms', '1000'); props.put('session.timeout.ms', '30000'); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); //訂閱名稱為“one-more-topic”的Topic的消息 kafkaConsumer.subscribe(Arrays.asList('one-more-topic')); return kafkaConsumer; }}

在Controller中進行使用:

@RestController@Slf4jpublic class Controller { @Autowired private KafkaConsumer<String, String> kafkaConsumer; @RequestMapping('/receive') public List<String> receive() { 從Kafka服務器中的名稱為“one-more-topic”的Topic中消費消息 ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1)); List<String> messages = new ArrayList<>(records.count()); for (ConsumerRecord<String, String> record : records.records('one-more-topic')) { String message = record.value(); log.info('message: {}', message); messages.add(message); } return messages; }}方式二:spring-kafka

使用kafka-clients需要我們自己創建生產者或者消費者的bean,如果我們的項目基于SpringBoot構建,那么使用spring-kafka就方便多了。

引入依賴

在pom.xml文件中,引入spring-kafka依賴:

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.12.RELEASE</version></dependency>

生產者

在application.yml文件中增加配置:

spring: kafka: #Kafka服務器地址 bootstrap-servers: 127.0.0.1:9092 producer: #設置數據value的序列化處理類 value-serializer: org.apache.kafka.common.serialization.StringSerializer

在Controller中注入KafkaTemplate就可以直接使用了,代碼如下:

@RestController@Slf4jpublic class Controller { @Autowired private KafkaTemplate<String, String> template; @RequestMapping('/springKafkaSend') public String send() { String uuid = UUID.randomUUID().toString(); //將消息發送到Kafka服務器的名稱為“one-more-topic”的Topic中 this.template.send('one-more-topic', uuid); log.info('uuid: {}', uuid); return uuid; }}

消費者

在application.yml文件中增加配置:

spring: kafka: #Kafka服務器地址 bootstrap-servers: 127.0.0.1:9092 consumer: #設置數據value的反序列化處理類 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

創建一個可以被Spring框架掃描到的類,并且在方法上加上@KafkaListener注解,就可以消費消息了,代碼如下:

@Component@Slf4jpublic class Receiver { @KafkaListener(topics = 'one-more-topic', groupId = 'spring-kafka-group') public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { String message = (String) kafkaMessage.get(); log.info('message: {}', message); } }}

到此這篇關于Java實現Kafka生產者和消費者的示例的文章就介紹到這了,更多相關Java Kafka生產者和消費者 內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Java
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
欧美精品福利| 欧美日韩四区| 日韩在线黄色| 伊人久久亚洲影院| 久久国产福利| 伊人久久亚洲| 天堂久久av| 日本午夜精品一区二区三区电影 | 超碰超碰人人人人精品| 久久精品国产福利| 国产精品99久久精品| 国产精品久久久久蜜臀| 伊人久久高清| 久久精品国产68国产精品亚洲| 欧美理论视频| 9久re热视频在线精品| 免费看黄色91| 欧美片网站免费| 久久免费精品| 亚洲精品一级二级| 一区在线免费观看| 综合色一区二区| 国产精品多人| 中文一区一区三区高中清不卡免费| 欧美特黄一级大片| 亚洲欧洲国产精品一区| 国产精品一区二区精品视频观看| 国产一区二区三区四区大秀| 中文在线а√天堂| 黄色日韩在线| 欧美亚洲二区| 伊伊综合在线| 麻豆91精品| 国产欧美亚洲精品a| 亚洲黄色中文字幕| 亚洲中午字幕| 欧美国产另类| 宅男在线一区| 91精品尤物| 精品亚洲美女网站| 亚洲精品在线a| 精品国产亚洲一区二区在线观看| 成人在线视频中文字幕| 欧美午夜精品一区二区三区电影| 亚洲深夜福利在线观看| 麻豆一区二区在线| 欧美/亚洲一区| 国产日韩亚洲| 久久一区二区三区喷水| 日韩高清一区二区| av综合电影网站| 中文字幕av一区二区三区四区| 国产精品日本一区二区不卡视频| 久久青草久久| 91福利精品在线观看| 麻豆精品蜜桃| 欧美午夜三级| 欧美在线亚洲| 国产精品成人一区二区网站软件| 蜜桃成人av| 国产精品久久久久久久久久齐齐| 国产一区清纯| 欧美极品一区二区三区| 99视频精品| 激情久久一区二区| 亚洲欧洲美洲国产香蕉| 蜜桃成人精品| 国产无遮挡裸体免费久久| 精品在线99| 欧美激情五月| 综合激情五月婷婷| 99精品在线观看| 久久av偷拍| 三级亚洲高清视频| 国产一区二区三区亚洲| 亚洲精品乱码久久久久久蜜桃麻豆| av资源中文在线| 国产视频网站一区二区三区| 亚洲一区区二区| 精品欧美一区二区三区在线观看| 国产日韩欧美一区| 伊人精品久久| 激情久久婷婷| 国产一区二区久久久久| 91在线成人| 久久性天堂网| 99视频精品全部免费在线视频| 欧美激情日韩| 亚洲免费资源| 欧美日韩精品免费观看视频完整| 日本一区二区高清不卡| 国产精品永久| 日本中文字幕不卡| 午夜一级久久| 不卡在线一区二区| 日韩伦理在线一区| 麻豆免费精品视频| 久久激情五月激情| 在线视频亚洲欧美中文| 国产精品av久久久久久麻豆网| 精品国产成人| 欧美成a人片免费观看久久五月天| 综合欧美精品| 亚洲欧美日韩综合国产aⅴ| 久久久久午夜电影| 最近高清中文在线字幕在线观看1| 久久国产精品美女| 国产乱论精品| 青草综合视频| 日韩av中文字幕一区二区三区| 久久亚洲国产精品一区二区| 宅男在线一区| 欧美精品一区二区久久| 亚洲午夜在线| 激情综合亚洲| 久久人人精品| 欧美日一区二区| 日本久久成人网| 免费观看亚洲| av资源中文在线天堂| 成人午夜在线| av资源中文在线| 秋霞影院一区二区三区| 久久精品国产大片免费观看| 亚洲a一区二区三区| 亚洲大片在线| 夜夜嗨网站十八久久| 国产农村妇女精品一区二区| 午夜亚洲福利在线老司机| 麻豆9191精品国产| 蜜桃久久av一区| 亚洲精品黄色| 青青草国产成人99久久| 欧美日韩va| 麻豆精品视频在线观看视频| 黄色精品视频| 韩国三级一区| 国产高清一区二区| 男人操女人的视频在线观看欧美| 在线精品一区二区| 日韩av一区二| 精品国产中文字幕第一页| 国产精品高颜值在线观看| 色婷婷久久久| 亚洲一区激情| 欧美日韩一区二区三区在线电影| 欧美激情一区| 色乱码一区二区三区网站| 天堂√8在线中文| 激情视频一区二区三区| 美国三级日本三级久久99| 欧美日韩伊人| 国产激情在线播放| 欧美日韩国产欧| 日韩精品一区二区三区中文在线| 国产乱子精品一区二区在线观看| 成人国产精品一区二区网站| 日韩高清中文字幕一区二区| 亚洲欧美日韩一区在线观看| 国产亚洲一区| 成人羞羞视频播放网站| 久久亚洲风情| 国产精品密蕾丝视频下载| 日韩88av| 黄色亚洲免费| 国产欧美日韩精品一区二区免费| 91中文字幕精品永久在线| 99国产精品视频免费观看一公开| 日本免费一区二区视频| 成人在线视频区| 国产亚洲福利| 久久99精品久久久野外观看| 日韩不卡在线| 五月激激激综合网色播| 久久精品日韩欧美| 99视频精品| 久久99久久人婷婷精品综合| 99久久夜色精品国产亚洲狼 | 久久激情婷婷| 免费视频最近日韩| 精品国产亚洲日本| 黄色在线一区| 麻豆精品视频在线观看| 一区在线观看| 国产精品videossex久久发布 | 在线天堂中文资源最新版| 乱人伦精品视频在线观看| 精品一区电影| 三级欧美在线一区| 日韩.com| 日韩精品福利一区二区三区| 日韩欧美字幕| 国产日韩一区二区三免费高清| 免费观看久久av| 久久免费影院| 日韩在线黄色| 欧美日韩国产亚洲一区| 精品免费视频| 亚洲欧美在线综合| 久久精品青草|