未加星标

Kafka集群配置

字体大小 | |
[系统(linux) 所属分类 系统(linux) | 发布者 店小二05 | 时间 20160901 | 作者 红领巾 ] 0人收藏点击收藏
前言

最近在利用Spark streaming和Kafka构建一个实时的数据分析系统,对图书阅读数据进行分析,做实时推荐。Spark Streaming 模块是对于 Spark Core 的一个扩展,目的是为了以高吞吐量,并且容错的方式处理持续性的数据流。目前 Spark Streaming 支持的外部数据源有 Flume、 Kafka、Twitter、ZeroMQ、TCP Socket 等。Apache Kafka是一个分布式的消息发布-订阅系统,Kafka可以作为流计算系统的数据源,本例中Spark Streaming将从Kafka中消费数据。

系统环境
软件版本

1
2
3
Spark: 1.4.1
Kafka: 0.8.1.1
zookeeper: 3.4.6
集群节点

一共有四台主机,主机名分别为nn0001, dn0001, dn0002, dn0003。


1
2
3
4
192.168.186.12 nn0001
192.168.186.13 dn0001
192.168.186.14 dn0002
192.168.186.15 dn0003

zookeeper安装
kafka使用zookeeper来管理,存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)。
Zookeeper的配置在机器1上完成后分发到其他三台机器即可。

1
2
3
4
5
6
[[email protected] ~]$ wget http://archive.apache.org/dist/zookeeper/stable/zookeeper-3.4.6.tar.gz
[[email protected] ~]$ tar -zxvf zookeeper-3.4.6.tar.gz
[[email protected] ~]$cd zookeeper-3.4.6/conf
[[email protected] conf]$ pwd
/home/bigdata/bigprosoft/zookeeper-3.4.6/conf
[[email protected] conf]$ cp zoo_sample.cfg zoo.cfg

修改配置文件


1
2
3
4
5
6
7
8
9
10
[[email protected] conf]$ vi zoo.cfg
tickTime=2000
dataDir=/home/bigdata/bigprosoft/zookeeper/data
clientPort=2181
initLimit=10
syncLimit=5
server.1=nn0001:2888:3888
server.2=dn0001:2888:3888
server.3=dn0002:2888:3888
server.4=dn0003:2888:3888

在dataDir目录下创建myid文件,nn0001机器的内容为1,dn0001机器的内容为2,更多依此类推。


1
2
3
[[email protected] data]$ echo 1 > myid
[[email protected] data]$ cat myid
1

启动测试


1
2
3
4
5
6
7
8
9
10
[[email protected] bin]$ ./zkServer.sh start
[[email protected] bin]$ jps
10805 QuorumPeerMain #已经启动成功了
15494 Master
11816 NameNode
20958 Jps
17539 Worker
12084 ResourceManager
12945 RunJar
12944 RunJar

停止


1
[[email protected] bin]$ ./zkServer.sh stop

其它机器相同操作,scp过去即可。

kafka安装
Kafka的broker、producer、consumer、topic等概念以及原理可以查阅官方文档
本次实验采用的多节点多broker集群模式,为每一台机器分配一个broker id。

1
2
3
4
5
6
7
8
9
10
[[email protected] ~]$ wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
[[email protected] ~]$ tar zxf kafka_2.10-0.8.1.1.tgz
[[email protected] ~]$ cd kafka_2.10-0.8.1.1
[[email protected] kafka_2.10-0.8.1.1]$ cd conf
[[email protected] conf]$ vi server.properties
broker.id=1 #其它机器的id依次递增即可
port=9092
host.name=192.168.186.12
advertised.host.name=192.168.186.12
zookeeper.connect=192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181

修改完成后分发到另外三台机器上。

启动测试


1
2
3
4
5
6
7
8
9
10
11
[[email protected] bin]$ nohup ./kafka-server-start.sh ../config/server.properties &
[[email protected] conf]$ jps
10805 QuorumPeerMain
21282 Jps
15494 Master
21209 Kafka
11816 NameNode
17539 Worker
12084 ResourceManager
12945 RunJar
12944 RunJar

依次启动机器

kafka使用测试

创建topic


1
[[email protected] bin]$ ./kafka-topics.sh --create --zookeeper nn0001:2181 --replication-factor 3 --partitions 1 --topic test

查看topic


1
2
3
4
5
6
[[email protected] bin]$ ./kafka-topics.sh --describe --zookeeper nn0001:2181
Topic:mytest PartitionCount:2 ReplicationFactor:2 Configs:
Topic: mytest Partition: 0 Leader: 2 Replicas: 3,2 Isr: 2
Topic: mytest Partition: 1 Leader: -1 Replicas: 4,3 Isr:
Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,3,4 Isr: 2

producer测试


1
2
3
[[email protected] bin]$ ./kafka-console-producer.sh --broker-list 192.168.186.12:9092 --topic test
gsdggfgfgfd
gdfgdfgdf

conumer测试


1
2
3
4
5
6
7
8
9
10
[[email protected] bin]$ ./kafka-console-consumer.sh --zookeeper 192.168.186.12:2181 --from-beginning --topic test
abfsfsdfsdfs
ffsdfs
gsdggfgfgfd
gdfgdfgdf
^C[2015-08-28 17:48:40,991] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
Consumed 7 messages
`

测试高可用


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[[email protected] bin]$ ./kafka-topics.sh --describe --zookeeper 192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181 --from-beginning --topic test
Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,3,4 Isr: 2,4
#可以看到leader是2,是dn0001机器,把此机器上的kafka进程杀掉,再查看topic的leader
[[email protected] bin]$ ./kafka-topics.sh --describe --zookeeper 192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 4 Replicas: 2,3,4 Isr: 4
#此时leader变成了4,对应的机器是dn0003.
[[email protected] bin]$ ./kafka-console-consumer.sh --zookeeper 192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181 --from-beginning --topic test
abfsfsdfsdfs
ffsdfs
gsdggfgfgfd
gdfgdfgdf
q
^C[2015-08-31 10:14:50,964] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
Consumed 7 messages
#消费者消费信息测试

ok,搭建过程就完成,下面用python/java/scala进行开发实例即可。

排错
问题1描述

1
2
3
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解决方法


1
2
3
[[email protected] ~]$ wget http://www.slf4j.org/dist/slf4j-1.7.12.tar.gz
[[email protected] ~]$ cd slf4j-1.7.12
[[email protected] ~]$ cp slf4j-nop-1.7.12.jar ~/bigprosoft/kafka/libs/

问题2描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[[email protected] bin]$ ./kafka-console-producer.sh --broker-list nn0001:9092 --topic test
fsfsdfsdf
……
[2015-08-28 17:24:18,417] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2015-08-28 17:24:18,419] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
……

解决方法,把server.properties中主机名改为IP地址即可。


1
2
3
host.name=10.171.59.221
advertised.host.name=10.171.59.221
zookeeper.connect=192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.codesec.net/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.codesec.net/Linux/2014-01/94682.htm

Apache kafka原理与特性(0.8V) http://www.codesec.net/Linux/2014-09/107388.htm

Kafka部署与代码实例 http://www.codesec.net/Linux/2014-09/107387.htm

Kafka介绍和集群环境搭建 http://www.codesec.net/Linux/2014-09/107382.htm

Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里

本文地址:http://www.codesec.net/Linux/2016-09/135116.htm


Kafka集群配置

本文系统(linux)相关术语:linux系统 鸟哥的linux私房菜 linux命令大全 linux操作系统

分页:12
转载请注明
本文标题:Kafka集群配置
本站链接:http://www.codesec.net/view/481882.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 系统(linux) | 评论(0) | 阅读(17)