日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

Java kafka如何實現自定義分區類和攔截器

瀏覽:25日期:2022-08-31 13:14:07

生產者發送到對應的分區有以下幾種方式:

(1)指定了patition,則直接使用;(可以查閱對應的java api, 有多種參數)

(2)未指定patition但指定key,通過對key的value進行hash出一個patition;

(3)patition和key都未指定,使用輪詢選出一個patition。

但是kafka提供了,自定義分區算法的功能,由業務手動實現分布:

1、實現一個自定義分區類,CustomPartitioner實現Partitioner

import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner { /** * * @param topic 當前的發送的topic * @param key 當前的key值 * @param keyBytes 當前的key的字節數組 * @param value 當前的value值 * @param valueBytes 當前的value的字節數組 * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //這邊根據返回值就是分區號, 這邊就是固定發送到三號分區 return 3; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }}

2、producer配置文件指定,具體的分區類

// 具體的分區類props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner');

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer攔截器

攔截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。

許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.ProducerInterceptor

我們可以編碼測試下:

1、定義消息攔截器,實現消息處理(可以是加時間戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;import java.util.UUID;public class MessageInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { System.out.println('這是MessageInterceptor的configure方法'); } /** * 這個是消息發送之前進行處理 * * @param record * @return */ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 創建一個新的record,把uuid入消息體的最前部 System.out.println('為消息添加uuid'); return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),UUID.randomUUID().toString().replace('-', '') + ',' + record.value()); } /** * 這個是生產者回調函數調用之前處理 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println('MessageInterceptor攔截器的onAcknowledgement方法'); } @Override public void close() { System.out.println('MessageInterceptor close 方法'); }}

2、定義計數攔截器

import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor<String, String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { System.out.println('這是CounterInterceptor的configure方法'); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { System.out.println('CounterInterceptor計數過濾器不對消息做任何操作'); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統計成功和失敗的次數 System.out.println('CounterInterceptor過濾器執行統計失敗和成功數量'); if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存結果 System.out.println('Successful sent: ' + successCounter); System.out.println('Failed sent: ' + errorCounter); }}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class Producer1 { public static void main(String[] args) throws Exception { Properties props = new Properties(); // Kafka服務端的主機名和端口號 props.put('bootstrap.servers', 'localhost:9092'); // 等待所有副本節點的應答 props.put('acks', 'all'); // 消息發送最大嘗試次數 props.put('retries', 0); // 一批消息處理大小 props.put('batch.size', 16384); // 請求延時,可能生產數據太快了 props.put('linger.ms', 1); // 發送緩存區內存大小,數據是先放到生產者的緩沖區 props.put('buffer.memory', 33554432); // key序列化 props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // value序列化 props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // 具體的分區類 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner'); //定義攔截器 List<String> interceptors = new ArrayList<>(); interceptors.add('kafka.MessageInterceptor'); interceptors.add('kafka.CounterInterceptor'); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1; i++) { producer.send(new ProducerRecord<String, String>('test_0515', i + '', 'xxx-' + i), new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println('這是producer回調函數');} }); } /*System.out.println('現在執行關閉producer'); producer.close();*/ producer.close(); }}

總結,我們可以知道攔截器鏈各個方法的執行順序,假如有A、B攔截器,在一個攔截器鏈中:

(1)執行A的configure方法,執行B的configure方法

(2)執行A的onSend方法,B的onSend方法

(3)生產者發送完畢后,執行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)執行producer自身的callback回調函數。

(5)執行A的close方法,B的close方法。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。

標簽: Java
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
日本亚州欧洲精品不卡| 久久人人99| 欧美日韩激情在线一区二区三区| 国产亚洲字幕| 欧美日韩xxxx| 国产精品v一区二区三区| 国产美女精品视频免费播放软件| 欧美一区影院| 国产精品v一区二区三区| 精品一区二区三区的国产在线观看| 国产欧美另类| 伊人久久视频| 久久青草久久| 激情欧美日韩一区| 久久亚洲美女| 日韩动漫一区| 国产在线观看www| 中文在线а√天堂| 好吊日精品视频| 日韩一区二区三免费高清在线观看| 欧美激情麻豆| 伊人久久大香线蕉av超碰演员| 日韩欧乱色一区二区三区在线| 国产剧情一区| 欧美精品一二| 国产精品chinese| 久久精品免费一区二区三区 | 亚洲麻豆一区| 国产aⅴ精品一区二区三区久久| 精品免费av在线| 日韩精品1区2区3区| av日韩中文| 日本一区二区三区中文字幕| 国产suv精品一区二区四区视频| 不卡av一区二区| 日韩精品一卡二卡三卡四卡无卡| 精品三级久久久| 日韩在线成人| 红桃视频国产精品| 国产精品欧美日韩一区| 在线亚洲自拍| 欧美freesex黑人又粗又大| 亚洲不卡视频| 激情欧美国产欧美| 国产一区国产二区国产三区 | 国产精品久久国产愉拍| 激情视频一区二区三区| 精品三级久久久| 日韩欧美高清一区二区三区| 日韩国产成人精品| 蜜臀国产一区二区三区在线播放| 日韩中文影院| 国产一区二区三区四区大秀 | 老司机精品视频在线播放| 99综合视频| 国产精品88久久久久久| 深夜视频一区二区| 最新中文字幕在线播放| 国产精品嫩草影院在线看| 日韩欧美中文字幕电影| 一区二区精品| 亚洲伊人影院| 蜜桃久久久久久久| 一区二区亚洲视频| 欧美久久香蕉| 日本国产欧美| 日本亚州欧洲精品不卡| 亚洲18在线| 亚洲va久久久噜噜噜久久| 日韩中文一区二区| 久久国内精品视频| 另类小说一区二区三区| 久久这里只有| 91精品蜜臀一区二区三区在线| 99精品美女| 亚洲少妇自拍| 亚洲精品护士| 国产精品22p| 亚洲天堂av影院| 亚洲国产日韩欧美在线| 狠狠久久婷婷| 亚洲麻豆一区| 精品视频在线一区二区在线| 四虎4545www国产精品 | 蜜桃伊人久久| 国产精品亚洲欧美一级在线| 国产精品xx| 伊人久久婷婷| 欧美一区二区三区久久精品| 都市激情国产精品| 视频一区二区国产| 麻豆久久久久久| 夜夜精品视频| 国产成人精品亚洲线观看| 91久久国产| 国产精品午夜一区二区三区| 亚洲综合电影| 久久国产视频网| 99成人在线视频| 国产精品尤物| 另类av一区二区| 成人在线丰满少妇av| 日韩三区四区| 日韩精品第一区| 亚洲一卡久久| 久久久久久黄| 麻豆久久久久久| 亚洲精品乱码久久久久久蜜桃麻豆| 国产videos久久| 国产欧美视频在线| 一区二区电影| 亚洲特色特黄| 理论片午夜视频在线观看| 日韩精品欧美成人高清一区二区| 欧美日韩精品一区二区视频| 精品一区二区三区中文字幕 | 日韩欧美四区| 红桃视频国产一区| 成人啊v在线| 日韩av免费大片| 欧美黑人做爰爽爽爽| 蜜桃视频在线观看一区| 午夜精品婷婷| 天堂久久一区| 日本va欧美va精品| 丝袜a∨在线一区二区三区不卡| 国产乱码午夜在线视频| 日韩精品免费观看视频| 久热精品在线| 亚洲精品看片| 日本视频在线一区| 日韩高清不卡一区二区| 日韩av字幕| 久久影院一区二区三区| 欧美国产极品| 成人国产精品久久| аⅴ资源天堂资源库在线| 97精品一区| 久久久久99| 99视频精品| 日本不卡一二三区黄网| 久久精品 人人爱| 国产精品免费不| 精品精品国产三级a∨在线| 91亚洲一区| 亚洲欧洲另类| 日韩av中文字幕一区二区| 国产精品一区二区免费福利视频| 国产欧美69| 亚洲精品88| 红桃视频国产精品| 日韩精品视频网| 日产精品一区二区| 不卡中文字幕| 久久激情五月激情| 国产精品久久久久av电视剧| 免费观看不卡av| 婷婷精品久久久久久久久久不卡| 国产精品久久久久久妇女| 日韩成人精品一区二区| 六月婷婷一区| av中文字幕在线观看第一页| 一区在线观看| 国产一区调教| 在线国产精品一区| 精品中文字幕一区二区三区| 国产亚洲亚洲| 高清久久精品| 偷拍亚洲精品| 日韩av免费| 久久99久久久精品欧美| 亚洲欧美高清| 99国产精品免费视频观看| 日韩欧美精品一区二区综合视频| 久久久人人人| 国产精品v日韩精品v欧美精品网站 | 日韩黄色在线观看| 亚洲精品国产嫩草在线观看 | 精品国产美女a久久9999| 婷婷精品在线观看| 久久亚洲专区| 色欧美自拍视频| 日本不卡高清| 免费在线看一区| 婷婷激情图片久久| 日韩深夜视频| 国产一区二区三区不卡av| 日本va欧美va欧美va精品| 久久成人亚洲| 国产国产精品| 欧美一级精品| 成人羞羞在线观看网站| 精品三区视频| 国内在线观看一区二区三区| 国产极品嫩模在线观看91精品| 日韩精品社区| 日本亚洲视频在线| 久久av一区| 美女尤物久久精品| 免费日韩视频|