自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kafka Java客戶(hù)端代碼示例

開(kāi)發(fā) 后端 Kafka
kafka是linkedin用于日志處理的分布式消息隊(duì)列,linkedin的日志數(shù)據(jù)容量大,但對(duì)可靠性要求不高,其日志數(shù)據(jù)主要包括用戶(hù)行為(登錄、瀏覽、點(diǎn)擊、分享、喜歡)以及系統(tǒng)運(yùn)行日志(CPU、內(nèi)存、磁盤(pán)、網(wǎng)絡(luò)、系統(tǒng)及進(jìn)程狀態(tài))……

介紹      http://kafka.apache.org

kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)

kafka是linkedin用于日志處理的分布式消息隊(duì)列,linkedin的日志數(shù)據(jù)容量大,但對(duì)可靠性要求不高,其日志數(shù)據(jù)主要包括用戶(hù)行為(登錄、瀏覽、點(diǎn)擊、分享、喜歡)以及系統(tǒng)運(yùn)行日志(CPU、內(nèi)存、磁盤(pán)、網(wǎng)絡(luò)、系統(tǒng)及進(jìn)程狀態(tài))

 當(dāng)前很多的消息隊(duì)列服務(wù)提供可靠交付保證,并默認(rèn)是即時(shí)消費(fèi)(不適合離線(xiàn))。

高可靠交付對(duì)linkedin的日志不是必須的,故可通過(guò)降低可靠性來(lái)提高性能,同時(shí)通過(guò)構(gòu)建分布式的集群,允許消息在系統(tǒng)中累積,使得kafka同時(shí)支持離線(xiàn)和在線(xiàn)日志處理

測(cè)試環(huán)境

kafka_2.10-0.8.1.1 3個(gè)節(jié)點(diǎn)做的集群

zookeeper-3.4.5 一個(gè)實(shí)例節(jié)點(diǎn)

代碼示例

消息生產(chǎn)者代碼示例

  1. import java.util.Collections;  
  2. import java.util.Date;  
  3. import java.util.Properties;  
  4. import java.util.Random;  
  5.    
  6. import kafka.javaapi.producer.Producer;  
  7. import kafka.producer.KeyedMessage;  
  8. import kafka.producer.ProducerConfig;  
  9.    
  10. /**  
  11.  * 詳細(xì)可以參考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example  
  12.  * @author Fung  
  13.  *  
  14.  */ 
  15. public class ProducerDemo {  
  16.     public static void main(String[] args) {  
  17.         Random rnd = new Random();  
  18.         int events=100;  
  19.    
  20.         // 設(shè)置配置屬性  
  21.         Properties props = new Properties();  
  22.         props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");  
  23.         props.put("serializer.class""kafka.serializer.StringEncoder");  
  24.         // key.serializer.class默認(rèn)為serializer.class  
  25.         props.put("key.serializer.class""kafka.serializer.StringEncoder");  
  26.         // 可選配置,如果不配置,則使用默認(rèn)的partitioner  
  27.         props.put("partitioner.class""com.catt.kafka.demo.PartitionerDemo");  
  28.         // 觸發(fā)acknowledgement機(jī)制,否則是fire and forget,可能會(huì)引起數(shù)據(jù)丟失  
  29.         // 值為0,1,-1,可以參考  
  30.         // http://kafka.apache.org/08/configuration.html  
  31.         props.put("request.required.acks""1");  
  32.         ProducerConfig config = new ProducerConfig(props);  
  33.    
  34.         // 創(chuàng)建producer  
  35.         Producer<String, String> producer = new Producer<String, String>(config);  
  36.         // 產(chǎn)生并發(fā)送消息  
  37.         long start=System.currentTimeMillis();  
  38.         for (long i = 0; i < events; i++) {  
  39.             long runtime = new Date().getTime();  
  40.             String ip = "192.168.2." + i;//rnd.nextInt(255);  
  41.             String msg = runtime + ",www.example.com," + ip;  
  42.             //如果topic不存在,則會(huì)自動(dòng)創(chuàng)建,默認(rèn)replication-factor為1,partitions為0  
  43.             KeyedMessage<String, String> data = new KeyedMessage<String, String>(  
  44.                     "page_visits", ip, msg);  
  45.             producer.send(data);  
  46.         }  
  47.         System.out.println("耗時(shí):" + (System.currentTimeMillis() - start));  
  48.         // 關(guān)閉producer  
  49.         producer.close();  
  50.     }  

消息消費(fèi)者代碼示例

  1. import java.util.HashMap;  
  2. import java.util.List;  
  3. import java.util.Map;  
  4. import java.util.Properties;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7.    
  8. import kafka.consumer.Consumer;  
  9. import kafka.consumer.ConsumerConfig;  
  10. import kafka.consumer.KafkaStream;  
  11. import kafka.javaapi.consumer.ConsumerConnector;  
  12.    
  13. /**  
  14.  * 詳細(xì)可以參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example  
  15.  *   
  16.  * @author Fung  
  17.  *  
  18.  */ 
  19. public class ConsumerDemo {  
  20.     private final ConsumerConnector consumer;  
  21.     private final String topic;  
  22.     private ExecutorService executor;  
  23.    
  24.     public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {  
  25.         consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));  
  26.         this.topic = a_topic;  
  27.     }  
  28.    
  29.     public void shutdown() {  
  30.         if (consumer != null)  
  31.             consumer.shutdown();  
  32.         if (executor != null)  
  33.             executor.shutdown();  
  34.     }  
  35.    
  36.     public void run(int numThreads) {  
  37.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  38.         topicCountMap.put(topic, new Integer(numThreads));  
  39.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer  
  40.                 .createMessageStreams(topicCountMap);  
  41.         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
  42.    
  43.         // now launch all the threads  
  44.         executor = Executors.newFixedThreadPool(numThreads);  
  45.    
  46.         // now create an object to consume the messages  
  47.         //  
  48.         int threadNumber = 0;  
  49.         for (final KafkaStream stream : streams) {  
  50.             executor.submit(new ConsumerMsgTask(stream, threadNumber));  
  51.             threadNumber++;  
  52.         }  
  53.     }  
  54.    
  55.     private static ConsumerConfig createConsumerConfig(String a_zookeeper,  
  56.             String a_groupId) {  
  57.         Properties props = new Properties();  
  58.         props.put("zookeeper.connect", a_zookeeper);  
  59.         props.put("group.id", a_groupId);  
  60.         props.put("zookeeper.session.timeout.ms""400");  
  61.         props.put("zookeeper.sync.time.ms""200");  
  62.         props.put("auto.commit.interval.ms""1000");  
  63.    
  64.         return new ConsumerConfig(props);  
  65.     }  
  66.    
  67.     public static void main(String[] arg) {  
  68.         String[] args = { "172.168.63.221:2188""group-1""page_visits""12" };  
  69.         String zooKeeper = args[0];  
  70.         String groupId = args[1];  
  71.         String topic = args[2];  
  72.         int threads = Integer.parseInt(args[3]);  
  73.    
  74.         ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);  
  75.         demo.run(threads);  
  76.    
  77.         try {  
  78.             Thread.sleep(10000);  
  79.         } catch (InterruptedException ie) {  
  80.    
  81.         }  
  82.         demo.shutdown();  
  83.     }  

消息處理類(lèi)

  1. import kafka.consumer.ConsumerIterator;  
  2. import kafka.consumer.KafkaStream;  
  3.    
  4. public class ConsumerMsgTask implements Runnable {  
  5.     private KafkaStream m_stream;  
  6.     private int m_threadNumber;  
  7.    
  8.     public ConsumerMsgTask(KafkaStream stream, int threadNumber) {  
  9.         m_threadNumber = threadNumber;  
  10.         m_stream = stream;  
  11.     }  
  12.    
  13.     public void run() {  
  14.         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();  
  15.         while (it.hasNext())  
  16.             System.out.println("Thread " + m_threadNumber + ": " 
  17.                     + new String(it.next().message()));  
  18.         System.out.println("Shutting down Thread: " + m_threadNumber);  
  19.     }  

Partitioner類(lèi)示例

  1. import kafka.producer.Partitioner;  
  2. import kafka.utils.VerifiableProperties;  
  3.    
  4. public class PartitionerDemo implements Partitioner {  
  5.     public PartitionerDemo(VerifiableProperties props) {  
  6.    
  7.     }  
  8.    
  9.     @Override 
  10.     public int partition(Object obj, int numPartitions) {  
  11.         int partition = 0;  
  12.         if (obj instanceof String) {  
  13.             String key=(String)obj;  
  14.             int offset = key.lastIndexOf('.');  
  15.             if (offset > 0) {  
  16.                 partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;  
  17.             }  
  18.         }else{  
  19.             partition = obj.toString().length() % numPartitions;  
  20.         }  
  21.            
  22.         return partition;  
  23.     }  
  24.    

參考

https://cwiki.apache.org/confluence/display/KAFKA/Index

https://kafka.apache.org/

原文鏈接:http://my.oschina.net/cloudcoder/blog/299215

責(zé)任編輯:林師授 來(lái)源: cloud-coder的博客
相關(guān)推薦

2010-03-18 16:49:43

Java Socket

2010-03-18 17:30:46

Java Socket

2021-05-07 15:28:03

Kafka客戶(hù)端Sarama

2010-03-18 17:47:07

Java 多客戶(hù)端通信

2017-01-11 10:38:17

MySQL客戶(hù)端代碼

2010-04-21 12:57:33

RAC負(fù)載均衡配置

2011-08-17 10:10:59

2021-09-22 15:46:29

虛擬桌面瘦客戶(hù)端胖客戶(hù)端

2022-09-23 08:02:42

Kafka消息緩存

2022-08-01 08:04:58

MySQL客戶(hù)端字符

2010-05-31 10:11:32

瘦客戶(hù)端

2011-10-26 13:17:05

2011-03-02 14:36:24

Filezilla客戶(hù)端

2010-12-21 11:03:15

獲取客戶(hù)端證書(shū)

2011-03-24 13:00:31

配置nagios客戶(hù)端

2011-03-21 14:53:36

Nagios監(jiān)控Linux

2009-03-04 10:27:50

客戶(hù)端組件桌面虛擬化Xendesktop

2011-04-06 14:24:20

Nagios監(jiān)控Linux

2013-05-09 09:33:59

2010-02-24 16:17:09

WCF獲取客戶(hù)端IP
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)