基本概念
Topic:Kafka将消息种子(Feed)分门别类, 每一类的消息称之为话题(Topic).
Producer:发布消息的对象称之为话题生产者(Kafka topic producer)
Consumer:订阅消息并处理发布的消息的种子的对象称之为话题消费者(consumers)
Broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。
Partition:每个Topic下会有一个或多个Partition,创建Topic时可指定Parition数量。每个Partition对应于一个文件夹,该文件夹下存储该{Partition的数据和索引文件。
Kafka就是讲生产者所生产的消息进行分类,而分类的条目就是Topic,最后由消费者接受消息的一个消息中间件。而消费者不是直接从生产者那里拿数据,而是从Broker从拉取数据,所谓Broker其实是一个服务器实例,负责接受从生产者那里来的数据,负责消息维护作用。
对于每个Topic,Kafka都会这一个分区的log,如图所示:
每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的(可以理解为Mysql中一张表的主键id)。Kafka集群保持所有的消息,直到它们过期,无论消息是否被消费了。实际上消费者所持有的仅有的元数据就是这个偏移量,也就是消费者在这个log中的位置。 这个偏移量由消费者控制:正常情况当消费者消费消息的时候,偏移量也线性的的增加。但是实际偏移量由消费者控制,消费者可以将偏移量重置为更老的一个偏移量,重新读取消息。可以看到这种设计对消费者来说操作自如, 一个消费者的操作不会影响其它消费者对此log的处理。
再说说分区。Kafka中采用分区的设计有几个目的:
- 是可以处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它可以不受限的处理更多的数据。
- 分区可以作为并行处理的单元。
命令行执行
我们可以先使用命令行才测试Kafka的简单操作。
先手动启动一个生产者者
1 2
| >kafka-console-producer.sh --broker-list 192.168.33.10:9092 --topic test This is message
|
在手动启动一个消费者
1 2
| >kafka-console-consumer.sh --zookeeper 192.168.33.10:2181 --topic test --from-beginning This is message //这就是收到的消息
|
这里要说明一下,我是使用Vagrant来做Kafka的服务器,连接时候会出现连接超时的情况,这时候需要调整kafka的配置中的advertised.host.name
,将其设置为我们为Vagrant的private_network
首先我这个kafka是在本机启动的,然后Vagrant是通过端口查询到的,然后kafka和zookeeper的端口是通过Vagrant映射来的,kafka是通过zookeeper来维护集群信息的,通过zookeeper找到Broker,然后尝试连接Broker。这个问题就是由于kafka发送的ip错误造成的,而advertised.host.name
设置是强行设置kafka的反馈ip。
Producer讲解
进行配置,主要有序列化的配置
1 2 3 4 5 6 7 8 9 10
| public ProducerConfig config(){ Properties props = new Properties(); props.put("metadata.broker.list", "192.168.33.10:9092,192.168.33.10:9091"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "io.github.binglau.BingPartitioner"); props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props); return config; }
|
定义Producer对象
Producer<String, String> producer = new Producer<String, String>(config);
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public void singleModeProduce(ProducerConfig config) throws InterruptedException{ Producer<String, String> producer = new Producer<String, String>(config); for(int i = 0; i < 10000; i++){ producer.send(getMsg(i)); System.out.println(i); Thread.sleep(1000); } }
public void batchModeProduce(ProducerConfig config){ List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String, String>>(100); Producer<String, String> producer = new Producer<String, String>(config); for(int i = 0; i < 1000; i++){ messages.add(getMsg(i)); if (i % 100 == 0){ producer.send(messages); messages.clear(); } } producer.send(messages); }
|
HighLevelConsumer讲解
有时,我们消费Kafka的消息,并不关心偏移量,我们仅仅关心数据能被消费就行。High Level Consumer(高级消费者)提供了消费信息的方法而屏蔽了大量的底层细节。
设计一个高级消费者
了解使用高层次消费者的第一件事是,它可以(而且应该!)是一个多线程的应用。线程围绕在你的主题分区的数量,有一些非常具体的规则:
- 如果你提供比在topic分区多的线程数量,一些线程将永远不会看到消息。
- 如果你提供的分区比你拥有的线程多,线程将从多个分区接收数据。
- 如果你每个线程上有多个分区,对于你以何种顺序收到消息是没有保证的。举个例子,你可能从分区10上获取5条消息和分区11上的6条消息,然后你可能一直从10上获取消息,即使11上也拥有数据。
- 添加更多的进程线程将使kafka重新平衡,可能改变一个分区到线程的分配。
配置
1 2 3 4 5 6 7 8 9
| private ConsumerConfig config(){ Properties props = new Properties(); props.put("group.id", groupId); props.put("consumer.id", consumerId); props.put("zookeeper.connect", Config.ZK_CONNECT); props.put("zookeeper.session.timeout.ms", "60000"); props.put("zookeeper.sync.time.ms", "2000"); return new ConsumerConfig(props); }
|
获取消息流
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);
创建线程处理消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| private class BingStreamThread implements Runnable{ private KafkaStream<byte[], byte[]> stream;
public BingStreamThread(KafkaStream<byte[], byte[]> stream){ this.stream = stream; } public void run() { ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();
// 逐条处理消息 while (streamIterator.hasNext()) { MessageAndMetadata<byte[], byte[]> message = streamIterator.next(); String topic = message.topic(); int partition = message.partition(); long offset = message.offset(); String key = new String(message.key()); String msg = new String(message.message()); // 在这里处理消息,这里仅简单的输出 // 如果消息消费失败,可以将已上信息打印到日志中,活着发送到报警短信和邮件中,以便后续处理 System.out.println("consumerId:" + consumerId + ", thread : " + Thread.currentThread().getName() + ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : " + key + " , mess : " + msg); } } }
|
消费消息
1 2 3 4 5 6 7
| // 为每个stream启动一个线程消费消息 for (KafkaStream<byte[], byte[]> stream : streams.get(Config.TOPIC)) { BingStreamThread bingStreamThread = new BingStreamThread(stream); executor.submit(bingStreamThread); }
|
SimpleConsumer讲解
为什么使用SimpleConsumer?
使用“SimpleConsumer”的主要原因是你想比使用“消费者分组”更好的控制分区消费。
比如你想:
1. 多次读取消息
2. 在一个处理过程中只消费Partition其中的一部分消息
3. 添加事务管理机制以保证消息被处理且仅被处理一次
使用SimpleConsumer有哪些弊端呢
这个SimpleConsumer确实需要很大的工作量:
1. 必须在程序中跟踪offset值.
2. 必须找出指定Topic(主题) Partition(分区)中的lead broker.
3. 必须处理broker的变动.
使用SimpleConsumer的步骤
找出Leader Broker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| // 从所有活跃的broker中找出哪个是指定Topic(主题) Partition(分区)中的leader broker
BrokerEndPoint leaderBroker = findLeader(Config.KAFKA_ADDRESS, Config.TOPIC, partition);
/** * 找到指定分区的leader broker * @return 指定分区的leader broker */ public BrokerEndPoint findLeader(String brokerHosts, String topic, int partition) { BrokerEndPoint leader = findPartitionMetadata(brokerHosts, topic, partition).leader(); System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(), leader.port())); return leader; }
/** * 找到指定分区的元数据 * @return 指定分区元数据 */ private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) { PartitionMetadata returnMetadata = null; for (String brokerHost : brokerHosts.split(",")) { SimpleConsumer consumer = null; try { String[] splits = brokerHost.split(":"); consumer = new SimpleConsumer( splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup" ); List<String> topics = Collections.singletonList(topic); TopicMetadataRequest request = new TopicMetadataRequest(topics); TopicMetadataResponse response = consumer.send(request); List<TopicMetadata> topicMetadatas = response.topicsMetadata(); for (TopicMetadata topicMetadata : topicMetadatas) { for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) { if (partitionMetadata.partitionId() == partition) { returnMetadata = partitionMetadata; break; } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + brokerHost + "] to find Leader for [" + topic + ", " + partition + "] Reason: " + e); } finally { if (consumer != null) { consumer.close(); } } } return returnMetadata; }
|
构造消费者
1 2 3 4 5 6
| // 构造消费 SimpleConsumer simpleConsumer = new SimpleConsumer( leaderBroker.host(), leaderBroker.port(), 20000, 10000, "bingSimpleConsumer"); long startOffset = 1; int fetchSize = 1000;
|
构造请求与处理请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| while (true) { long offset = startOffset; // 构造请求 FetchRequest request = new FetchRequestBuilder() .addFetch(Config.TOPIC, 0, startOffset, fetchSize).build();
// 发送请求获取数据 FetchResponse response = simpleConsumer.fetch(request);
ByteBufferMessageSet messageSet = response.messageSet(Config.TOPIC, partition); for (MessageAndOffset messageAndOffset : messageSet) { Message msg = messageAndOffset.message(); ByteBuffer payload = msg.payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); String message = new String(bytes); offset = messageAndOffset.offset(); System.out.println("partition : " + 3 + ", offset : " + offset + " msg : " + message); } // 继续消费下一批 startOffset = offset + 1; }
|
参考资料
Kafka教程
代码示例
代码示例