kafka

简介

kafka是分布式的、基于发布/订阅模式(消费者主动拉取消息)的、消息队列(Message Queue)。

kafka由scala编写。

消息队列的特点

解耦

生产者消息 –> 消息队列 –> 消费者自取

可恢复性

即使一个处理消息进程挂掉,【加入队列中的消息】仍然可以在系统恢复后被处理。

缓冲

解决生产消息和消费消息处理速度不一致的情况。

异步通信

提供异步处理机制,可以任意向队列放入消息,当需要的时候再处理。

动态灵活

任意增减机器,避免资源浪费,也可以临时应对突发压力。

消息队列的类别

一对一点对点模式

【消息使用后消失】消费者主动拉取数据,获取数据后,清除消息。也就是一个消息,被消费者消费后就消失。

  1. 生产者生产消息发送到队列
  2. 消费者从队列拉取消息
  3. 消息被消费了,队列不再存储而是被删除。

image-20200816222629229

一对多发布/订阅模式

【消息使用后不会消失】

  1. 生产者生产消息发送到topic中
  2. 消费者订阅(消费)消息
  3. 发布到topic的消息会被所有订阅者消费

image-20200816222613621

发布订阅模式的类型:

  1. 消费者主动拉取数据(如kafka):消费者要不断去监听(轮询)队列信息。消费者资源浪费。
  2. 生产者主动推送数据:消费者能力处理不足

kafka简介

特性

高吞吐

生产者

消费者

  1. 消费者组,提高消费能力
  2. 同一个消费者组里面的,不能同时(只能派一个)消费某一个分区里的数据

kafka集群管理消息

  1. broker:服务器,可以容纳多个topic。
  2. topic:一类消息队列,生产者和消费者面向的都是一个topic。
  3. partition分区:一个topic可以分为多个partition,使一个topic可以分到多个服务器。每个partition是一个有序队列。
  4. replica:副本,为了保证集群中某个节点故障时,节点上的partition数据不丢。topic的每个分区都有若干副本。一个leader都有若干follower。
  5. follower备,leader主(leader坏掉之后,某一个follower启用成为新的leader)

zookeeper注册消息

  1. 帮助kafka集群存储信息
  2. 帮助消费者存储信息
  3. kafka依赖zookeeper

image-20200816224938047

安装

zookeeper

kafka依赖zookeeper。前提是zookeeper启动。

1
zk.sh start

下载

wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz

寻找下载路径的过程:

  1. http://kafka.apache.org/downloads

image-20200816225928155

  1. https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz

image-20200816231813471

配置

1
2
3
4
5
6
7
# 解压
tar -zxvf kafka_2.13-2.6.0.tgz

cd opt/modules/kafka_2.13-2.6.0/config

# 重点关注config目录中的server.properties
# 一般使用自己安装的单独的zookeeper,不会关注kafka自带的zookeeper.properties
server.properties

以下为节选的重点内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
vim server.properties

# --------------------------------------------------------
# 一个broker就是一台服务器,broker.id全局唯一、整数。The id of the broker. This must be set to a unique integer for each broker.
# 给每个机器,设置不同的broker.id
broker.id=0

#允许删除topic
delete.topic.enable=true


# 是kafka暂存【数据】和日志的目录(数据和日志都存在这里)。A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# 上述kafka暂存数据(7天)168小时。The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168


# 配置连接zookeeper集群地址,可以逗号分隔多个地址
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".逗号分隔多个服务器。
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
配置环境变量

可以方便全局使用命令

1
2
3
4
vim /etc/profile

export KAFKA_HOME=/opt/module/kafka
export PATH = $PATH:$KAFKA_HOME/bin

启动

参数说明

cd opt/modules/kafka_2.13-2.6.0/bin

  1. kafka-server-start.sh 启动服务

  2. kafka-server-stop.sh 停止服务

  1. kafka-console-consumer.sh 仅控制台打印,用于测试环境

  2. kafka-console-producer.sh

  1. kafka-consumer-perf-test.sh 消费者压力测试

  2. kafka-producer-perf-test.sh 生产者压力测试,测试整个集群的负载能力。

  1. kafka-topics.sh 主题topic的增删改查
启动命令
依次每个机器,单节点启动
1
2
3
4
5
6
7
8
9
#直接启动,非常驻进程
./kafka-server-start.sh ../config/server.properties

#守护进程,常驻启动
./kafka-server-start.sh -daemon ../config/server.properties

#查看进程
jps
结果中会有kafka

查看/tmp/kafka-logs/server.log日志。

批量管理

vim kafkaManage.sh

chmod 777 kafkaManage.sh

kafkaManage.sh stop

xcall.sh jps【注意xcall是Hadoop管理虚拟机脚本,批量查看各个虚拟机的jps】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/bin/bash
case $1 in
"start"){
for i in 机器1,机器2,机器3
do
echo "------$i--------"
ssh $i "/opt/modules/kafka_2.13-2.6.0/bin/kafka-server-start.sh --daemon /opt/modules/kafka_2.13-2.6.0/config/server.properties"

done
};;

"stop"){
for i in 机器1,机器2,机器3
do
echo "------$i--------"
ssh $i "/opt/modules/kafka_2.13-2.6.0/bin/kafka-server-stop.sh /opt/modules/kafka_2.13-2.6.0/config/server.properties"

done
};;

#"status"){
# for i in 机器1,机器2,机器3
# do
# echo "------$i--------"
# ssh $i "opt/modules/zookeeper-3.4.10/bin/zkServer.sh status"

# done
#};;

esac

命令行

topics管理

1
2
3
4
5
6
7
8
9
10
11
12
#查看主题列表
kafka-topics.sh --list --zookeeper 机器1:2181

#创建主题
kafka-topics.sh --create --zookeeper 机器1:2181 --topic first【这个是自定义的主题名]】--partition 2【2个分区】 --replication-factor 2【2个副本】
###kafka-topics.sh --create --zookeeper 机器1:2181 --topic first【这个是自定义的主题名]】--partition 2【3个分区】 --replication-factor 2【1个副本】

#删除主题
kafka-topics.sh --delete --zookeeper 机器1:2181 --topic first

#获取主题详情信息
kafka-topics.sh --describe --zookeeper 机器1:2181 --topic first

此时查看各个机器的日志

cd logs

可以看到各个机器的副本加起来,总共有两套

  1. first-0
  2. first-1
-------------Keep It Simple Stupid-------------
0%