54 CHEN

Linkedin高吞吐量分布式消息系统kafka使用手记

linkedin kafka kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。

设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。

kakfa的consumer使用拉的方式工作。

安装kafka 下载:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

> tar xzf kafka-.tgz
> cd kafka- > ./sbt update
> ./sbt package
启动zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动server:
bin/kafka-server-start.sh config/server.properties
就是这么简单。

使用kafka

  1. import java.util.Arrays;

  2. import java.util.List;

  3. import java.util.Properties;

  4. import kafka.javaapi.producer.SyncProducer;

  5. import kafka.javaapi.message.ByteBufferMessageSet;

  6. import kafka.message.Message;

  7. import kafka.producer.SyncProducerConfig;

  8. Properties props = new Properties();

  9. props.put(“zk.connect”, “127.0.0.1:2181”);

  10. props.put(“serializer.class”, “kafka.serializer.StringEncoder”);

  11. ProducerConfig config = new ProducerConfig(props);

  12. Producer<String, String> producer = new Producer<String, String>(config);

  13. Send a single message

  14. // The message is sent to a randomly selected partition registered in ZK

  15. ProducerData<String, String> data = new ProducerData<String, String>(“test-topic”, “test-message”);

  16. producer.send(data);

  17. producer.close();

这样就是一个标准的producer。

consumer的代码

  1. // specify some consumer properties

  2. Properties props = new Properties();

  3. props.put(“zk.connect”, “localhost:2181”);

  4. props.put(“zk.connectiontimeout.ms”, “1000000”);

  5. props.put(“groupid”, “test_group”);

  6. // Create the connection to the cluster

  7. ConsumerConfig consumerConfig = new ConsumerConfig(props);

  8. ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

  9. // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume

  10. Map<String, List<KafkaMessageStream» topicMessageStreams =

  11. consumerConnector.createMessageStreams(ImmutableMap.of(“test”, 4));

  12. List<KafkaMessageStream> streams = topicMessageStreams.get(“test”);

  13. // create list of 4 threads to consume from each of the partitions

  14. ExecutorService executor = Executors.newFixedThreadPool(4);

  15. // consume the messages in the threads

  16. for(final KafkaMessageStream stream: streams) {

  17. executor.submit(new Runnable() {

  18. public void run() {

  19. for(Message message: stream) {

  20. // process message

  21. }

  22. }

  23. });

  24. }

原创文章如转载,请注明:转载自五四陈科学院[http://www.54chen.com]

Posted by 54chen 架构研究