日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

Kafka Java Producer代碼實例詳解

瀏覽:162日期:2022-08-31 15:45:44

根據業務需要可以使用Kafka提供的Java Producer API進行產生數據,并將產生的數據發送到Kafka對應Topic的對應分區中,入口類為:Producer

Kafka的Producer API主要提供下列三個方法:

public void send(KeyedMessage<K,V> message) 發送單條數據到Kafka集群 public void send(List<KeyedMessage<K,V>> messages) 發送多條數據(數據集)到Kafka集群 public void close() 關閉Kafka連接資源

一、JavaKafkaProducerPartitioner:自定義的數據分區器,功能是:決定輸入的key/value鍵值對的message發送到Topic的那個分區中,返回分區id,范圍:[0,分區數量); 這里的實現比較簡單,根據key中的數字決定分區的值。具體代碼如下:

import kafka.producer.Partitioner;import kafka.utils.VerifiableProperties;/** * Created by gerry on 12/21. */public class JavaKafkaProducerPartitioner implements Partitioner { /** * 無參構造函數 */ public JavaKafkaProducerPartitioner() { this(new VerifiableProperties()); } /** * 構造函數,必須給定 * * @param properties 上下文 */ public JavaKafkaProducerPartitioner(VerifiableProperties properties) { // nothings } @Override public int partition(Object key, int numPartitions) { int num = Integer.valueOf(((String) key).replaceAll('key_', '').trim()); return num % numPartitions; }}

二、 JavaKafkaProducer:通過Kafka提供的API進行數據產生操作的測試類;具體代碼如下:

import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import org.apache.log4j.Logger;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.ThreadLocalRandom;/** * Created by gerry on 12/21. */public class JavaKafkaProducer { private Logger logger = Logger.getLogger(JavaKafkaProducer.class); public static final String TOPIC_NAME = 'test'; public static final char[] charts = 'qazwsxedcrfvtgbyhnujmikolp1234567890'.toCharArray(); public static final int chartsLength = charts.length; public static void main(String[] args) { String brokerList = '192.168.187.149:9092'; brokerList = '192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095'; brokerList = '192.168.187.146:9092'; Properties props = new Properties(); props.put('metadata.broker.list', brokerList); /** * 0表示不等待結果返回<br/> * 1表示等待至少有一個服務器返回數據接收標識<br/> * -1表示必須接收到所有的服務器返回標識,及同步寫入<br/> * */ props.put('request.required.acks', '0'); /** * 內部發送數據是異步還是同步 * sync:同步, 默認 * async:異步 */ props.put('producer.type', 'async'); /** * 設置序列化的類 * 可選:kafka.serializer.StringEncoder * 默認:kafka.serializer.DefaultEncoder */ props.put('serializer.class', 'kafka.serializer.StringEncoder'); /** * 設置分區類 * 根據key進行數據分區 * 默認是:kafka.producer.DefaultPartitioner ==> 安裝key的hash進行分區 * 可選:kafka.serializer.ByteArrayPartitioner ==> 轉換為字節數組后進行hash分區 */ props.put('partitioner.class', 'JavaKafkaProducerPartitioner'); // 重試次數 props.put('message.send.max.retries', '3'); // 異步提交的時候(async),并發提交的記錄數 props.put('batch.num.messages', '200'); // 設置緩沖區大小,默認10KB props.put('send.buffer.bytes', '102400'); // 2. 構建Kafka Producer Configuration上下文 ProducerConfig config = new ProducerConfig(props); // 3. 構建Producer對象 final Producer<String, String> producer = new Producer<String, String>(config); // 4. 發送數據到服務器,并發線程發送 final AtomicBoolean flag = new AtomicBoolean(true); int numThreads = 50; ExecutorService pool = Executors.newFixedThreadPool(numThreads); for (int i = 0; i < 5; i++) { pool.submit(new Thread(new Runnable() {@Overridepublic void run() { while (flag.get()) { // 發送數據 KeyedMessage message = generateKeyedMessage(); producer.send(message); System.out.println('發送數據:' + message); // 休眠一下 try { int least = 10; int bound = 100; Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound)); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ' shutdown....');} }, 'Thread-' + i)); } // 5. 等待執行完成 long sleepMillis = 600000; try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } flag.set(false); // 6. 關閉資源 pool.shutdown(); try { pool.awaitTermination(6, TimeUnit.SECONDS); } catch (InterruptedException e) { } finally { producer.close(); // 最后之后調用 } } /** * 產生一個消息 * * @return */ private static KeyedMessage<String, String> generateKeyedMessage() { String key = 'key_' + ThreadLocalRandom.current().nextInt(10, 99); StringBuilder sb = new StringBuilder(); int num = ThreadLocalRandom.current().nextInt(1, 5); for (int i = 0; i < num; i++) { sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(' '); } String message = sb.toString().trim(); return new KeyedMessage(TOPIC_NAME, key, message); } /** * 產生一個給定長度的字符串 * * @param numItems * @return */ private static String generateStringMessage(int numItems) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < numItems; i++) { sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]); } return sb.toString(); }}

三、Pom.xml依賴配置如下

<properties> <kafka.version>0.8.2.1</kafka.version></properties><dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> </dependency></dependencies>

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。

標簽: Java
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
日本 国产 欧美色综合| 国产精品蜜芽在线观看| 捆绑调教日本一区二区三区| 日本欧美一区| 亚洲字幕久久| 免费在线看一区| 水蜜桃久久夜色精品一区的特点| 欧美亚洲在线日韩| 欧美一区二区三区激情视频| 黑人精品一区| 日韩精品1区| 国产免费久久| 国产精品永久| 国产乱码精品一区二区三区四区| 日本a级不卡| 国产精品igao视频网网址不卡日韩 | 国产suv精品一区| 国产欧美另类| 国产亚洲欧美日韩精品一区二区三区| 日韩高清电影一区| 亚洲另类黄色| 国产三级一区| 精品久久精品| 日本美女一区| 在线看片不卡| 亚洲另类黄色| 国产欧美日韩在线一区二区 | 国产精品普通话对白| 日韩亚洲在线| 在线免费观看亚洲| 中文视频一区| 国产日韩欧美一区在线| 国产精品欧美大片| 国产毛片久久久| 国产欧美日韩在线一区二区| 精品亚洲成人| 欧美日韩一二| 亚洲久久视频| 国产精品成人自拍| 亚洲精品在线影院| 亚洲专区在线| 欧美一区久久| 97精品一区| 欧美精品羞羞答答| 日本a级不卡| 亚洲欧洲美洲av| 中文字幕中文字幕精品| 国产精品一区二区免费福利视频| 日韩精品乱码av一区二区| 国产午夜精品一区在线观看| 成人亚洲欧美| 色8久久久久| 青青青免费在线视频| 男人的天堂亚洲一区| 国产欧美在线观看免费| 国产 日韩 欧美一区| 蜜臀av亚洲一区中文字幕| 麻豆精品国产91久久久久久| 激情丁香综合| 国产探花在线精品| 在线一区视频观看| 亚洲18在线| 国产精品欧美三级在线观看| 精品视频99| 极品日韩av| 国产精品免费大片| 亚洲电影在线一区二区三区| 青青草视频一区| 激情久久久久久| 国产精品mm| 免费在线观看日韩欧美| av资源新版天堂在线| 日韩一区二区三区精品视频第3页 日韩一区二区三区免费视频 | 国产一区二区三区天码| 91超碰国产精品| 国产欧美午夜| 黄色在线一区| 高清av一区| 日本精品另类| 99热精品在线观看| 黑人精品一区| 国产精品视频一区二区三区综合| 亚洲女同中文字幕| 欧美激情网址| 亚洲人妖在线| 欧美日韩国产在线观看网站| 激情综合五月| 青青青国产精品| 欧美中文一区二区| 久久一区精品| 91精品丝袜国产高跟在线| 午夜国产精品视频| 日产精品一区| 高清一区二区| 麻豆国产欧美日韩综合精品二区| 在线观看视频免费一区二区三区| 91精品一区国产高清在线gif| 久久av导航| 日韩av中文字幕一区二区 | 国产va免费精品观看精品视频| 91精品日本| 日韩一区二区三区四区五区| 亚洲电影有码| 欧美精品高清| 中文字幕一区久| av免费不卡国产观看| 老司机免费视频一区二区| 日韩av中文字幕一区二区| 免费日韩视频| 午夜欧美精品久久久久久久| 日韩欧美1区| 久久97视频| 国产精品白丝久久av网站| 亚洲ab电影| 亚洲精品乱码日韩| 日韩国产欧美在线视频| 亚洲精品系列| 久久国产高清| 免费国产亚洲视频| 免费成人性网站| 免费视频一区二区| 日韩中文字幕av电影| 蜜桃av一区二区三区电影| 美国三级日本三级久久99| 伊人久久大香伊蕉在人线观看热v| 香蕉视频成人在线观看| 亚洲在线久久| 7777精品| 精品一区二区三区视频在线播放| 精品国产亚洲一区二区三区大结局| 久久影院资源站| 国产一区精品福利| 精品久久久亚洲| 日韩欧美二区| 久久亚洲国产| 91亚洲成人| 国产成人精品999在线观看| 国精品产品一区| 日韩网站中文字幕| 红桃视频国产精品| 亚洲免费观看高清完整版在线观| 日韩1区2区3区| 国产日韩三级| 国产精品毛片一区二区在线看| 青青青免费在线视频| 亚洲手机视频| 亚洲不卡视频| 国产日韩欧美一区在线| 欧美成人精品一级| 高清在线一区| 视频福利一区| 日韩精品一卡二卡三卡四卡无卡 | 国产麻豆久久| 石原莉奈在线亚洲二区| 国产亚洲字幕| 韩国久久久久久| 红桃视频欧美| 免费人成黄页网站在线一区二区| 日韩不卡在线观看日韩不卡视频| 国产99在线| 久久免费国产| 亚洲综合欧美| 免费在线欧美黄色| 激情综合亚洲| 国产欧美日韩在线一区二区 | 国产成人免费视频网站视频社区| 中文字幕免费一区二区| 久久精品国产www456c0m| 精品三级久久久| 欧美日韩亚洲一区二区三区在线| 黄色日韩在线| 欧美丝袜一区| 9999国产精品| 国产精品伦一区二区| 三级亚洲高清视频| 精品丝袜在线| 在线看片国产福利你懂的| 国产欧美大片| 青青草伊人久久| 伊人www22综合色| 影音国产精品| 欧美日韩水蜜桃| 日本美女一区| 中文在线а√天堂| 国产一区二区三区日韩精品| 国产午夜一区| 日韩国产精品久久久| 最新日韩欧美| 欧美理论视频| 欧美精品一区二区久久| 岛国av在线网站| 日本欧美国产| 欧美国产偷国产精品三区| 国产一区二区三区四区五区 | 亚洲一区黄色| 午夜日韩在线| 亚洲精品网址| 国产视频一区欧美| 久久av一区二区三区| 丝袜美腿一区二区三区|