初学Kafka——Kafka配置

系统环境

我使用的是vagrant配置的Debian8环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$  ~ uname -a
Linux debian-jessie 3.16.0-4-amd64 #1 SMP Debian 3.16.7-ckt20-1+deb8u4 (2016-02-29) x86_64 GNU/Linux
$ ~ free -m
total used free shared buffers cached
Mem: 494 155 338 1 10 88
-/+ buffers/cache: 57 437
Swap: 461 4 457
$ ~ df -h
文件系统 容量 已用 可用 已用% 挂载点
/dev/sda1 9.2G 2.7G 6.1G 30% /
udev 10M 0 10M 0% /dev
tmpfs 99M 4.4M 95M 5% /run
tmpfs 248M 0 248M 0% /dev/shm
tmpfs 5.0M 0 5.0M 0% /run/lock
tmpfs 248M 0 248M 0% /sys/fs/cgroup
tmpfs 50M 0 50M 0% /run/user/1000

JDK以及Kafka环境配置

1
2
3
4
5
6
7
8
9
91 # Java环境配置
92 export JAVA_HOME="$HOME/.utils/jdk"
93 export PATH="$JAVA_HOME/bin:$PATH"
94 export CLASSPATH="$JAVA_HOME/lib:$PATH"
95
96 # Kafka环境配置
97 export KAFKA_HOME="$HOME/.utils/kafka"
98 export PATH="$KAFKA_HOME/bin:$PATH"
99 export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" #与下面错误有关

这里可以看出,我将Java和kafka放入/home/.utils

这里我们需要先看一下kafka的目录

1
2
$  kafka ls
bin config libs LICENSE logs NOTICE site-docs

进入bin

1
2
3
4
5
6
7
$  bin ls
connect-distributed.sh kafka-consumer-groups.sh kafka-reassign-partitions.sh kafka-simple-consumer-shell.sh zookeeper-server-start.sh
connect-standalone.sh kafka-consumer-offset-checker.sh kafka-replay-log-producer.sh kafka-topics.sh zookeeper-server-stop.sh
kafka-acls.sh kafka-consumer-perf-test.sh kafka-replica-verification.sh kafka-verifiable-consumer.sh zookeeper-shell.sh
kafka-configs.sh kafka-mirror-maker.sh kafka-run-class.sh kafka-verifiable-producer.sh
kafka-console-consumer.sh kafka-preferred-replica-election.sh kafka-server-start.sh windows
kafka-console-producer.sh kafka-producer-perf-test.sh kafka-server-stop.sh zookeeper-security-migration.sh

进入config

1
2
3
4
5
$  kafka cd config
$ config ls
connect-console-sink.properties connect-file-sink.properties connect-standalone.properties producer.properties tools-log4j.properties
connect-console-source.properties connect-file-source.properties consumer.properties server.properties zookeeper.properties
connect-distributed.properties connect-log4j.properties log4j.properties test-log4j.properties

启动

启动zookeeper

这里我们使用kafka自带的zookeeper
从上面的bin中我们可以找到zookeeper-server-start.sh用来启动zookeeper,配置使用config中的zookeeper.properties
进入zookeeper.properties可以看到

1
2
3
4
5
6
15 # the directory where the snapshot is stored.
16 dataDir=/data/kafka/zookeeper
17 # the port at which the clients will connect
18 clientPort=2181
19 # disable the per-ip limit on the number of connections since this is a non-production config
20 maxClientCnxns=0

我们将数据放入/data/kafka/zookeeper

启动命令
zookeeper-server-start.sh ~/.utils/kafka/config/zookeeper.properties

启动kafka

kafka的配置文件我们同样选择config中的server.properties
其中我们只需要关注这几个配置

1
2
3
4
broker.id = 0
port = 9092
log.dirs = /data/kafka-logs # 非log,数据存储处
zookeeper.connect = localhost:2181

如果是不同副本前三个需要不一致

启动命令
kafka-server-start.sh ~/.utils/kafka/config/server.properties
这里我遇到一个错误:There is insufficient memory for the Java Runtime Environment to continue
其具体描述与解决方法如下:
解决办法

可以通过jps -l查看运行在jdk上的应用程序

1
2
3
4
$  ~ jps -l
6049 sun.tools.jps.Jps
5714 kafka.Kafka
5561 org.apache.zookeeper.server.quorum.QuorumPeerMain

可看到zookeeperkafka成功运行

创建Topic

kafka-topics.sh --create --zookeeper localhost:2181 --topic topic_name --partition 3 --replication-factor 1
--partition 指定分区数
--replication-factor 指定副本数

通过下面命令来查看创建的topic

1
2
$  ~ kafka-topics.sh --list --zookeeper localhost:2181
test1

这样kafka就算搭建起来了