日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術(shù)文章
文章詳情頁

Java操作Kafka執(zhí)行不成功

瀏覽:153日期:2023-12-29 14:49:40

問題描述

使用kafka-clients操作kafka始終不成功,原因不清楚,下面貼出相關(guān)代碼及配置,請懂得指點一下,謝謝!

環(huán)境及依賴

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version></dependency>

JDK版本為1.8、Kafka版本為2.12-0.10.2.0,服務(wù)器使用CentOS-7構(gòu)建。

測試代碼

TestBase.java

public class TestBase { protected Logger log = LoggerFactory.getLogger(this.getClass()); protected String kafka_server = '192.168.60.160:9092' ; protected String topic = 'zlikun_topic';}

ProducerTest.java

public class ProducerTest extends TestBase { protected Properties props = new Properties(); @Before public void init() {props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ProducerConfig.ACKS_CONFIG, 'all');props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ; } @Test public void test() throws InterruptedException {KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 發(fā)送消息for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) {System.out.printf('offset = %d ,partition = %d n', recordMetadata.offset() ,recordMetadata.partition()); } else {log.error('send error !' ,e); }} });}TimeUnit.SECONDS.sleep(3);producer.close(); }}

ConsumerTest.java

public class ConsumerTest extends TestBase { private Properties props = new Properties(); @Before public void init() {props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ConsumerConfig.GROUP_ID_CONFIG ,'zlikun') ;props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 'true');props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, '1000');props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } @Test public void test() {Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));//consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) {System.out.printf('offset = %d, key = %s, value = %s%n', record.offset(), record.key(), record.value()); }} }}問題

# 測試topic為手動創(chuàng)建$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

控制臺輸出信息

[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time

問題解答

回答1:

測試了下, 正常 https://github.com/MOBX/kafka...

建議檢查下kafka集群連接是否正常,你報的是TimeoutException;如果不行, kafka-clients降到0.8.2.0試試

回答2:

我把日志調(diào)成DEBUG級別,觀察日志發(fā)現(xiàn)是不能正確解析主機名造成的。

2017-04-11 13:49:46.046 [main] DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 0 at m160:9092:java.io.IOException: Can’t resolve address: m160:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:182) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629) at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:57) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:768) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:684) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:347) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.zlikun.mq.ConsumerTest.test(ConsumerTest.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:107) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:649) at org.apache.kafka.common.network.Selector.connect(Selector.java:179) ... 36 more

網(wǎng)上找到一篇博文http://blog.sina.com.cn/s/blo...也支持了這一點,同樣我是在hosts文件中配置了主機名,測試就正常了。不過感覺這樣做似乎不太合理,實際應(yīng)用中這樣用,太影響運維了吧,不知道有沒有其它更好的解決辦法。

[2017/04/11 16:16]剛從網(wǎng)上找到一篇文章http://www.tuicool.com/articl...,解決了這個問題!

標簽: java
相關(guān)文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
91九色综合| 国产va免费精品观看精品视频| 久久99蜜桃| 欧美日韩一区自拍| 日韩成人午夜精品| 97se亚洲| 久久成人福利| 麻豆成人91精品二区三区| 欧美日韩中出| 国产黄色一区| 高清日韩中文字幕| av中文字幕在线观看第一页| 久久久久久久欧美精品| 国产亚洲精品v| 国产视频一区三区| 日韩精品一卡二卡三卡四卡无卡| 最新日韩欧美| 蜜臀久久久久久久| 日韩免费精品| 国产欧美日韩视频在线| 久久精品国产福利| 成人福利av| 99国产精品久久久久久久| 欧美中文一区二区| 夜夜精品视频| 视频一区日韩精品| 久久不见久久见中文字幕免费| 国产中文在线播放| 日韩午夜av| 四虎在线精品| 精品欧美日韩精品| 激情丁香综合| 无码日韩精品一区二区免费| 久久狠狠久久| 亚洲伦乱视频| 只有精品亚洲| 精品中文字幕一区二区三区四区| 99精品网站| 亚洲一区欧美| 国精品产品一区| 中文欧美日韩| 久久这里只有精品一区二区| 欧美成人亚洲| 国产欧美日韩一区二区三区在线| 欧美aa在线观看| 亚洲日本欧美| 三上亚洲一区二区| 免费成人在线视频观看| 久久wwww| 午夜欧美视频| 欧美91在线| 在线视频亚洲| 久久伊人亚洲| 在线精品视频在线观看高清| 日韩精品一区二区三区中文在线| 精品国产精品国产偷麻豆| 在线日韩中文| 国产美女亚洲精品7777| 欧美日韩一区二区综合 | 国产不卡精品| 久久亚洲二区| 国产资源在线观看入口av| 天堂成人国产精品一区| 久久精品国产久精国产| 日韩一区精品视频| 成人片免费看| 亚洲免费资源| 91精品一区二区三区综合| 91福利精品在线观看| 免费视频亚洲| 精品午夜久久| 欧美久久亚洲| 亚洲欧美高清| 亚洲精品国产嫩草在线观看 | 日韩中文字幕| 欧美日韩色图| 老司机精品视频在线播放| 日韩在线播放一区二区| 欧美精选视频一区二区| 久久精品999| 亚洲自拍另类| 99精品在线| 日本一区二区免费高清| 欧美一区二区三区免费看| 蜜桃成人av| 中文av在线全新| 欧美激情一区| 91亚洲无吗| 蜜桃视频一区二区三区在线观看| 久久精品动漫| 天堂av在线| 欧美成人一二区| 日本va欧美va瓶| 视频一区视频二区中文| 久久精品欧美一区| 色婷婷色综合| 久久中文在线| 国产精品亚洲欧美一级在线| 亚洲精品在线a| 久久亚洲美女| 日韩视频一区二区三区在线播放免费观看 | 日本免费一区二区视频| 亚洲欧美视频| 91久久中文| 在线国产一区二区| 播放一区二区| 日韩成人高清| 精品欧美日韩精品| 久久不见久久见免费视频7| 久久国产视频网| 婷婷成人av| 亚洲精品美女91| 美国三级日本三级久久99| 亚洲91久久| 久久国产日韩| 欧美不卡在线| 精品91久久久久| 国产视频一区三区| 爽好久久久欧美精品| 美女网站久久| 亚洲日本在线观看视频| 亚洲精品国产精品粉嫩| 久久亚洲不卡| 免费一级片91| 亚洲深夜福利在线观看| 亚洲一区二区三区四区电影 | 国产精品分类| 欧美日本三区| 免费视频一区二区三区在线观看| 国产精品分类| 97人人精品| 欧美日韩尤物久久| 欧美日韩一二| 性色一区二区| 亚洲影视一区二区三区| 亚洲2区在线| 国产精品久久久久9999高清| 精品视频在线你懂得| 久久久久久网| 成人免费电影网址| 国产亚洲高清视频| 日韩久久99| 久久精品资源| 999国产精品999久久久久久| 9色精品在线| 日韩不卡在线观看日韩不卡视频 | 精品国产一区二区三区噜噜噜| 久久精品亚洲一区二区| 88xx成人免费观看视频库| 欧美+亚洲+精品+三区| 午夜在线播放视频欧美| 日韩国产欧美三级| 精品国产黄a∨片高清在线| 日韩亚洲一区在线| 国产国产精品| 亚洲欧美网站在线观看| 免费在线亚洲| 欧美91精品| 91精品啪在线观看国产爱臀| 国产成人精品一区二区免费看京 | 久色成人在线| 欧美日韩午夜| 色一区二区三区| 国产精品美女| 国产精品一区毛片| 伊人久久大香线蕉av不卡| 亚洲我射av| 动漫av一区| 亚洲制服少妇| 国产精品mv在线观看| 私拍精品福利视频在线一区| 日韩中文字幕亚洲一区二区va在线| 国产精品一页| 婷婷亚洲五月| 国产伦精品一区二区三区千人斩| 日韩一区自拍| 日韩一区二区三区四区五区| 鲁大师精品99久久久| 国产一区亚洲| 青青伊人久久| 亚洲韩日在线| 国产调教精品| 久久国产精品成人免费观看的软件| 综合激情网...| 精品99在线| 蜜桃av一区| 久久亚洲精品中文字幕| 一本色道久久精品| 精品国产一区二区三区av片| 模特精品在线| 国产精品a级| 最新日韩欧美| 日韩av专区| 日韩精品一页| 激情六月综合| 韩国一区二区三区视频| 蜜臀久久久久久久| 成人羞羞视频播放网站| 国产日韩欧美高清免费| 亚洲专区一区|