消息队列 Kafka –未完

摘要

消息队列 Kafka 是一个分布式的、高吞吐量、高可扩展性消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等,是大数据生态中不可或缺的产品之一。Apache Kafka起源于LinkedIn,后来于2011年成为开源Apache项目,然后于2012年成为First-class Apache项目。Kafka是用Scala和Java编写的。 Apache Kafka是基于发布订阅的容错消息系统。 它是快速,可扩展和设计分布。

Kafka 从原理开始

消息队列


一、消息队列简介

首先,我们需要了解一下什么是消息队列
image_1d6drev25flv10101no21i481nhn9.png-126.8kB

消息队列解释

1.点对点模式 (一对一,消费者主动拉取数据,消息收到后消息清除)
点对点模型通常是一个基于拉取或者轮训的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端,这个模型的特点是发送到队列的消息被一个且只有一个接收者接瘦处理,即使有多个消息监听者也是如此。

2.发布/订阅模式 (一对多,数据生产后,推送给所有订阅者)
发布订阅模型则是一个机遇推送的消息传送模型,发布订阅类型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接受消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态

两种模式优缺点

  • 主动模式缺点
    客户端B需要有一个线程实时监控队列(Queue)里的数据,因为不确定什么时候有数据。相反订阅模式不需要客户端知道是否有数据,有数据写入时,会由队列进行发送

  • 订阅模式的缺点
    客户端订阅会节省进程,但是还会有一个问题,拉取速度是由客户端控制,客户端的带宽限制不相同,所以推送速度需要统一。例如客户端C是2M,客户端B是10M,就会产生状态不一致。有可能丢数据等问题

为什么要使用消息队列

1)解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2)冗余
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列采用的“插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕
3)扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可
4)灵活性& 峰值处理能力  (有了集群峰值处理能力自然提升)
5)可恢复性  (数据是有冗余的)
6)顺序保证
7)缓冲
8)异步通信

二、Kafka 介绍

什么是Kafka

在流式计算中,Kafka一般用于来缓存数据,Storm通过消费Kafka的数据进行计算。
Apache Kafka是一个开源消息系统,由Scala写成,是由Apache软件基金会开发的一个开源消息系统项目

Kafka最初由Linkedin公司开发,并与2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

Kafka是一个分布式消息队列。Kafka对消息保存是根据Topic进行归类,发送消息称为Producer,消息接受者称为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(Server)称为broker

无论是kafka集群,还是consumer都依赖于zookeeper集群,主要保存一些meta信息,来保证系统可用性 [友情提示:新版本的kafka,consumer已经不存储在zookeeper集群里]

Kafka架构图
image_1d6dtqunhfujpg4hdl13en1jse13.png-51.7kB
QQ20190321-0.png-469.6kB

  • Producer 发送消息的称为: 生产者,向Kafka Broker发消息的客户端
  • Consumer 消息消息的称为: 消费者,向Kafka Broker取消息的客户端 (新版本的consumer信息已经不存储在zookeeper里,是保存在kafka。后面安装会有提示)
  • Topic 相当于硬盘,broker相当于服务器,需要使用topic(硬盘)存储数据
  • Consumer Group (CG);这是kafka用来实现一个topic消息的广播(发送给所有的consumer)和单播(发给任意一个consumer)的手段。一个Topic可以有多个CG。topic的消息会复制(不是真正意义上的复制)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分区而不需要多次发送消息到不同的Topic
  • Broker 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic
  • Partition 为了实现扩展性,一个非常大的topic可以分布到多个brocker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序
  • offset kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。
  • Replication 相当于一个分区
  • leader 一个分区一个leader,无论生产还是消费都要找leader
  • follower 备份作用,当leader挂掉后,follower会自动升级为leader。follower和leader不会在一台服务器上,是出现在2台服务器上,以保证数据的冗余
    分区和副本都是我们创建topic自己指定,可以只有一个分区
  • zookeeper 这里使用zookeeper主要是用于consumer存储数据推送的时间,以及kafka集群信息(在新版本里面,2.11 consumer的数据已经不存在zookeeper里)

消费者概念
消费者消费数据也是找的leader,在kafka集群中,消费者和生产者都相当于客户端,只有leader会客户端进行相应。
如果有读请求发给follower,follower会进行读取,但是如果有写请求发送给follower,follwer会转发给leader。当leader处理完之后会将结果在反回给follower,由follower反回给客户端

kafka流程
kafka cluster集群 → Broker节点→ Topic (相当于硬盘) → partition (分区和副本)→ leader、follower处理消费者或者生产者的数据

这里有一个消费者概念需要注意

消费者有一个组的概念,同一个组的消费者不可以同时消费一个分区的数据,结合上图来看,consumerA可以消费Partition0的数据,consumerC也可以消费Partition0的数据,但是consumerB不可以同时消费Partition0的数据。

consumer是可以消费不同分区的数据Partition0/1,一个消费者也是可以消费多个topic的数据

consumer集群:我们可以使用多个线程去读取数据,将多个消费者放在一个消费者组(CG)里面,让一个组里面的不同消费者消费不同分区里的数据

三、Kafka集群搭建

环境说明

CentOS Linux release 7.5.1804 (Core)
10.4.82.125     #kafka
10.4.82.127     #zookeeper  kafka
10.4.82.128     #kafka

Kafka官方网站:http://kafka.apache.org
image_1d6ak5j191sh78ud1clkkcv1fbr9.png-172.6kB

这里需要说的一点,kafka_2.11这个版本为Scala的版本,2.1.1是kafka版本

  1. wget http://mirrors.shu.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz
  2. tar xf kafka_2.11-2.1.1.tgz -C /usr/local
  3. ln -s /usr/local/kafka_2.11-2.1.1 /usr/local/kafka
  4. #因为我们下载的二进制包,不需要编译,直接mv就可以使用

当我们下载完kafka还需要下载jdk和zookeeper,前面也说了kafka依赖于zookeeper,zookeeper和kafka同时也需要JDK的支持
JDK下载地址
JDK历史版本下载
image_1d6f81ceq15ibf6k12mv1dkdqpo25.png-308.3kB

Java环境安装配置

  1. wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" https://download.oracle.com/otn-pub/java/jdk/8u201-b09/42970487e3af4f5aa5bca3f542482c60/jdk-8u201-linux-x64.tar.gz
  2. 如果当前下载连接失效,只需要更换后面的下载地址即可
  3. [root@abcdocker ~]# tar xf jdk-8u121-linux-x64.tar.gz -C /usr/local/
  4. root@abcdocker ~]# ln -s /usr/local/jdk1.8.0_121/ /usr/local/jdk
  5. [root@abcdocker ~]# vim /etc/profile
  6. export JAVA_HOME=/usr/local/jdk
  7. export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
  8. export CLASSPATH=.$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar
  9. [root@abcdocker ~]# . /etc/profile
  10. [root@abcdocker ~]# java -version
  11. java version "1.8.0_121"
  12. Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
  13. Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

Zookeeper安装
安装文档:https://old.i4t.com/2195.html
这里zookeeper不详细说了

  1. # 1.下载zookeeper
  2. [root@abcdocker ~]# wget http://www-eu.apache.org/dist/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
  3. [root@abcdocker ~]# tar xf zookeeper-3.4.13.tar.gz -C /usr/local/
  4. [root@abcdocker ~]# ln -s /usr/local/zookeeper-3.4.13/ /usr/local/zookeeper
  5. # 2.配置zookeeper
  6. [root@abcdocker ~]# cd /usr/local/zookeeper/conf/
  7. [root@abcdocker conf]# cp zoo_sample.cfg zoo.cfg
  8. [root@abcdocker conf]# mkdir ../data
  9. [root@abcdocker conf]# mkdir /var/log/zookeeper
  10. [root@abcdocker conf]# vim zoo.cfg
  11. tickTime=2000
  12. initLimit=10
  13. syncLimit=5
  14. dataDir=/usr/local/zookeeper/data/
  15. dataLogDir=/var/log/zookeeper/
  16. clientPort=2181
  17. # 3.启动zookeeper
  18. [root@abcdocker ~]# /usr/local/zookeeper/bin/zkServer.sh start
  19. JMX enabled by default
  20. Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
  21. Starting zookeeper ... STARTED
  22. [root@abcdocker conf]# ps -ef|grep zook
  23. root 2875 1 9 12:31 pts/1 00:00:00 /usr/local/jdk/bin/java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.7.5.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.7.5.jar:/usr/local/zookeeper/bin/../lib/servlet-api-2.5-20081211.jar:/usr/local/zookeeper/bin/../lib/netty-3.7.0.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-2.11.jar:/usr/local/zookeeper/bin/../lib/jetty-util-6.1.26.jar:/usr/local/zookeeper/bin/../lib/jetty-6.1.26.jar:/usr/local/zookeeper/bin/../lib/javacc.jar:/usr/local/zookeeper/bin/../lib/jackson-mapper-asl-1.9.11.jar:/usr/local/zookeeper/bin/../lib/jackson-core-asl-1.9.11.jar:/usr/local/zookeeper/bin/../lib/commons-cli-1.2.jar:/usr/local/zookeeper/bin/../zookeeper-3.5.0-alpha.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:........:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/application/jdk/lib:/application/jdk/jre/lib:/application/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/application/jdk/lib:/application/jdk/jre/lib:/application/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar -Xmx1000m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/zookeeper/bin/../conf/zoo.cfg

zookeeper端口为2181,后面有使用到的地方

zookeeper安装完毕,现在我们配置kafka

  1. [root@yzsjhl82-125 config]# cd /usr/local/kafka/config
  2. [root@yzsjhl82-125 config]# ll
  3. 总用量 68
  4. -rw-r--r-- 1 root root 906 2 9 02:30 connect-console-sink.properties
  5. -rw-r--r-- 1 root root 909 2 9 02:30 connect-console-source.properties
  6. -rw-r--r-- 1 root root 5321 2 9 02:30 connect-distributed.properties
  7. -rw-r--r-- 1 root root 883 2 9 02:30 connect-file-sink.properties
  8. -rw-r--r-- 1 root root 881 2 9 02:30 connect-file-source.properties
  9. -rw-r--r-- 1 root root 1111 2 9 02:30 connect-log4j.properties
  10. -rw-r--r-- 1 root root 2262 2 9 02:30 connect-standalone.properties
  11. -rw-r--r-- 1 root root 1221 2 9 02:30 consumer.properties
  12. -rw-r--r-- 1 root root 4727 2 9 02:30 log4j.properties
  13. -rw-r--r-- 1 root root 1925 2 9 02:30 producer.properties
  14. -rw-r--r-- 1 root root 6851 2 9 02:30 server.properties #需要修改的配置文件
  15. -rw-r--r-- 1 root root 1032 2 9 02:30 tools-log4j.properties
  16. -rw-r--r-- 1 root root 1169 2 9 02:30 trogdor.conf
  17. -rw-r--r-- 1 root root 1023 2 9 02:30 zookeeper.properties

server.properties文件修改

  1. [root@i4t config]# grep -Ev "^$|#" server.properties
  2. broker.id=3 # 这里是kafka节点名称,不可以重复,在集群内部要唯一
  3. listeners=PLAINTEXT://:9092 #broker的默认端口端口为9092
  4. num.network.threads=3
  5. num.io.threads=8
  6. socket.send.buffer.bytes=102400
  7. socket.receive.buffer.bytes=102400
  8. socket.request.max.bytes=104857600
  9. log.dirs=/data/logs/kafaka #存储和日志的路径
  10. num.partitions=1
  11. num.recovery.threads.per.data.dir=1
  12. offsets.topic.replication.factor=1
  13. transaction.state.log.replication.factor=1
  14. transaction.state.log.min.isr=1
  15. log.retention.hours=168 #kafka存储时间,单位为小时(默认七天)
  16. log.segment.bytes=1073741824 #存储文件大小(默认为1个G,超过1个G的数据会被删除)
  17. log.retention.check.interval.ms=300000
  18. zookeeper.connect=10.4.82.127:2181 #zookeeper连接地址,集群以逗号分隔
  19. zookeeper.connection.timeout.ms=6000
  20. group.initial.rebalance.delay.ms=0
  21. delete.topic.enable=true #不添加这个参数,topic只会被标记删除

说完conf目录,下面介绍一下bin目录下的脚本

  1. [root@i4t bin]# tree -L 1
  2. .
  3. ├── connect-distributed.sh
  4. ├── connect-standalone.sh
  5. ├── kafka-acls.sh
  6. ├── kafka-broker-api-versions.sh
  7. ├── kafka-configs.sh
  8. ├── kafka-console-consumer.sh #控制台消费者,主要用于测试
  9. ├── kafka-console-producer.sh #控制台生产者
  10. ├── kafka-consumer-groups.sh
  11. ├── kafka-consumer-perf-test.sh
  12. ├── kafka-delegation-tokens.sh
  13. ├── kafka-delete-records.sh
  14. ├── kafka-dump-log.sh
  15. ├── kafka-log-dirs.sh
  16. ├── kafka-mirror-maker.sh
  17. ├── kafka-preferred-replica-election.sh
  18. ├── kafka-producer-perf-test.sh
  19. ├── kafka-reassign-partitions.sh
  20. ├── kafka-replica-verification.sh
  21. ├── kafka-run-class.sh
  22. ├── kafka-server-start.sh #启动脚本
  23. ├── kafka-server-stop.sh #停止脚本
  24. ├── kafka-streams-application-reset.sh
  25. ├── kafka-topics.sh #topic管理脚本,只要是topic相关的,都要运行这个脚本
  26. ├── kafka-verifiable-consumer.sh
  27. ├── kafka-verifiable-producer.sh
  28. ├── trogdor.sh
  29. ├── windows
  30. ├── zookeeper-security-migration.sh
  31. ├── zookeeper-server-start.sh
  32. ├── zookeeper-server-stop.sh
  33. └── zookeeper-shell.sh

启动kafka
在启动kafka之前,要确保zookeeper是正常的,否则连接会出现问题

  1. [root@i4t ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &>/data/logs/kafaka/kafka.log &
  2. 启动的时候我们需要指定server.properties配置文件
  3. 默认日志输出到前台,我们程序使用后台启动日志输出到指定目录
  4. 另外一种启动方式
  5. [root@i4t ~]# /usr/local/kafka/bin/kafka-server-start.sh -daemon config/server.properties

ID为3的server已经启动

  1. [root@i4t kafaka]# netstat -lntup|grep 9092
  2. tcp6 0 0 :::9092 :::* LISTEN 21529/java

image_1d6fib9111be21pds158263u8p39.png-954.2kB

kafka常见命令创建

创建topic

  1. [root@i4t ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.4.82.127:2181 --partitions 2 --replication-factor 2 --topic one
  2. Created topic "one".
  3. #这里显示我们已经创建名称为one的topic,分区数和副本数为2
  4. --create 创建topics
  5. --zookeeper zookeeper地址
  6. --partitions 分区数
  7. --replication-factor 副本数
  8. --topic topic名称

这里的副本数只可以根据broker的数量创建

  1. [root@i4t kafka]# ./bin/kafka-topics.sh --create --zookeeper 10.4.82.127:2181 --partitions 2 --replication-factor 4 --topic two
  2. Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
  3. [2019-03-20 16:41:05,012] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
  4. (kafka.admin.TopicCommand$)
  5. #在kafka上,如果节点(broker)达不到设置的副本数,将无法进行创建。

查看当前服务器中所有的topic

  1. [root@i4t ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181
  2. one

我们可以去我们创建的目录查看一下

  1. [root@i4t kafka]# ll /data/logs/kafka/
  2. 总用量 48
  3. -rw-r--r-- 1 root root 0 3 20 15:07 cleaner-offset-checkpoint
  4. -rw-r--r-- 1 root root 4 3 20 16:34 log-start-offset-checkpoint
  5. -rw-r--r-- 1 root root 54 3 20 15:07 meta.properties
  6. drwxr-xr-x 2 root root 137 3 20 16:30 one-0 ##这里是分区
  7. drwxr-xr-x 2 root root 137 3 20 16:30 one-1 #我们创建了2个分区,所以这里都是2个
  8. -rw-r--r-- 1 root root 29808 3 20 16:34 out.log
  9. -rw-r--r-- 1 root root 20 3 20 16:34 recovery-point-offset-checkpoint
  10. -rw-r--r-- 1 root root 20 3 20 16:35 replication-offset-checkpoint
  11. 另一个集群同样也是2个,起到一个冗余的作用

发送消息

  1. [root@i4t ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.4.82.127:9092 --topic one
  2. >i4t.com
  3. >
  4. # 参数解释
  5. kafka-console-producer.sh 生产者脚本
  6. --broker-list borker节点(生产者连接broker,消费者连接zookeeper
  7. --topic topic名称

现在我们已经创建好生产者,但是现在没有消费者,我们在>符号下面输入完成后窗口不关闭,在打开一台当做消费者,进行模拟测试

在我们集群中任意启动一台当做消费者都是可以的

消费信息

  1. 旧版本
  2. /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 10.4.82.127:2181 --topic one
  3. 新版本
  4. [root@i4t kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 10.4.82.127:9092 --from-beginning --topic one
  5. i4t.com
  6. #参数解释
  7. --bootstrap-server 如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中.新版本推荐存储在--bootstrap-server
  8. 10.4.82.127:9092 这里的端口是kafka地址
  9. --from-beginning 控制台consumer只获取最新的数据,如果需要以前所有的数据需要加上--from-beginning

在kafka新版本中,consumer已经将数据存储在kafka本地

  1. [root@i4t ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181
  2. __consumer_offsets
  3. one
  4. 我们查看topic的时候,发现新创建了一个名称为__consumer_offsets,出现这个topic的原因是我们将consumer连接到kafka上。这个topic保存我们的访问时间

查看Topic的详情

  1. [root@yzsjhl82-127 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.4.82.127:2181 --describe --topic one
  2. Topic:one PartitionCount:2 ReplicationFactor:2 Configs:
  3. Topic: one Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
  4. Topic: one Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
  5. #参数解释
  6. Tocpic:One #Topic名称
  7. PartitionCount:2 #分区数
  8. ReplicationFactor:2 #副本数
  9. Configs: #其它配置
  10. Topic: one Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
  11. 名称 分区 这里的leader1就是borker1 副本(数据)是我们存储目录里的-位数 Isr选举,
  12. 生产者会把数据写入到leader里,follower和其他副本都是去leader上拉取数据。Isr是根据和leader数据相差最小排在最前面,第一个是0代表leader,第二或者第三个就是按照和leader差异大小排列

删除topic

  1. [root@i4t ~]# /usr/local/kafka/bin/kafka-topic.sh --zookeeper 10.4.82.127:2181 --delete --topic one
  2. Topic one is marked for deletion.
  3. Note: This will have no impact if delete.topic.enable is not set to true.
  4. --delete 删除topic参数
  5. --topic 后面接topic名称,不需要指定分区
  6. #删除的时候提示我们,没有进行物理删除,而是进行标记删除。我们只需要在server.properties添加delete.topic.enable=true重启kafka即可
  7. 例子:删除topic
  8. [root@i4t kafka]# ./bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181
  9. __consumer_offsets
  10. one
  11. [root@i4t kafka]# ./bin/kafka-topics.sh --delete --zookeeper 10.4.82.127:2181 --topic one
  12. Topic one is marked for deletion.
  13. Note: This will have no impact if delete.topic.enable is not set to true.
  14. [root@i4t kafka]# ./bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181
  15. __consumer_offsets
  16. #删除以后还是会提示我们没有开启delete.topic.enable=true,只要我们查看topic没有one即可
  17. 删除topic时,同时也会将/data/logs/kafka里面的one-(1-2)给删除,这里简单说一下one-(1-2)其实就是one下的2个分区

我们在新建一个

  1. [root@i4t kafka]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.4.82.127:2181 --partitions 1 --replication-factor 3 --topic one
  2. Created topic "one".
  3. 我们这次创建了一个分区,所以集群的服务器上在/data目录只会有一个分区(因为是3个副本),不会出现one-1或者是one-2
  4. [root@i4t kafka]# ll /data/logs/kafka/
  5. 总用量 96
  6. -rw-r--r-- 1 root root 4 3 25 14:49 cleaner-offset-checkpoint
  7. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-11
  8. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-14
  9. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-17
  10. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-2
  11. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-20
  12. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-23
  13. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-26
  14. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-29
  15. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-32
  16. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-35
  17. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-38
  18. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-41
  19. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-44
  20. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-47
  21. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-5
  22. drwxr-xr-x 2 root root 137 3 21 18:02 __consumer_offsets-8
  23. -rw-r--r-- 1 root root 4 3 25 14:54 log-start-offset-checkpoint
  24. -rw-r--r-- 1 root root 54 3 20 15:07 meta.properties
  25. drwxr-xr-x 2 root root 137 3 25 14:53 one-0
  26. -rw-r--r-- 1 root root 73881 3 21 17:59 out.log
  27. -rw-r--r-- 1 root root 394 3 25 14:54 recovery-point-offset-checkpoint
  28. -rw-r--r-- 1 root root 394 3 25 14:55 replication-offset-checkpoint

通过zookeeper查看kafka集群

  1. /usr/local/zookeeper/bin/zkCli.sh
  2. [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids #查看broker的id
  3. [0]
  4. [zk: localhost:2181(CONNECTED) 1] ls /brokers/topics #查看消息
  5. [k8s-fb-staging-log4j-tomcat-renren-jinkong-kylin-standard-server

A4CB5471-1A4F-4EE0-B151-1A60A6FD9006.png-97kB


新闻联播老司机

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: