简介
kafka是分布式的、基于发布/订阅模式(消费者主动拉取消息)的、消息队列(Message Queue)。
kafka由scala编写。
消息队列的特点
解耦
生产者消息 –> 消息队列 –> 消费者自取
可恢复性
即使一个处理消息进程挂掉,【加入队列中的消息】仍然可以在系统恢复后被处理。
缓冲
解决生产消息和消费消息处理速度不一致的情况。
异步通信
提供异步处理机制,可以任意向队列放入消息,当需要的时候再处理。
动态灵活
任意增减机器,避免资源浪费,也可以临时应对突发压力。
消息队列的类别
一对一点对点模式
【消息使用后消失】消费者主动拉取数据,获取数据后,清除消息。也就是一个消息,被消费者消费后就消失。
- 生产者生产消息发送到队列
- 消费者从队列拉取消息
- 消息被消费了,队列不再存储而是被删除。
一对多发布/订阅模式
【消息使用后不会消失】
- 生产者生产消息发送到topic中
- 消费者订阅(消费)消息
- 发布到topic的消息会被所有订阅者消费
发布订阅模式的类型:
- 消费者主动拉取数据(如kafka):消费者要不断去监听(轮询)队列信息。消费者资源浪费。
- 生产者主动推送数据:消费者能力处理不足
kafka简介
特性
高吞吐
生产者
消费者
- 消费者组,提高消费能力
- 同一个消费者组里面的,不能同时(只能派一个)消费某一个分区里的数据
kafka集群管理消息
- broker:服务器,可以容纳多个topic。
- topic:一类消息队列,生产者和消费者面向的都是一个topic。
- partition分区:一个topic可以分为多个partition,使一个topic可以分到多个服务器。每个partition是一个有序队列。
- replica:副本,为了保证集群中某个节点故障时,节点上的partition数据不丢。topic的每个分区都有若干副本。一个leader都有若干follower。
- follower备,leader主(leader坏掉之后,某一个follower启用成为新的leader)
zookeeper注册消息
- 帮助kafka集群存储信息
- 帮助消费者存储信息
- kafka依赖zookeeper
安装
zookeeper
kafka依赖zookeeper。前提是zookeeper启动。
1 | zk.sh start |
下载
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
寻找下载路径的过程:
配置
1 | 解压 |
server.properties
以下为节选的重点内容
1 | vim server.properties |
配置环境变量
可以方便全局使用命令
1 | vim /etc/profile |
启动
参数说明
cd opt/modules/kafka_2.13-2.6.0/bin
kafka-server-start.sh 启动服务
kafka-server-stop.sh 停止服务
kafka-console-consumer.sh 仅控制台打印,用于测试环境
kafka-console-producer.sh
kafka-consumer-perf-test.sh 消费者压力测试
kafka-producer-perf-test.sh 生产者压力测试,测试整个集群的负载能力。
- kafka-topics.sh 主题topic的增删改查
启动命令
依次每个机器,单节点启动
1 | 直接启动,非常驻进程 |
查看/tmp/kafka-logs/server.log日志。
批量管理
vim kafkaManage.sh
chmod 777 kafkaManage.sh
kafkaManage.sh stop
xcall.sh jps【注意xcall是Hadoop管理虚拟机脚本,批量查看各个虚拟机的jps】
1 | !/bin/bash |
命令行
topics管理
1 | 查看主题列表 |
此时查看各个机器的日志
cd logs
可以看到各个机器的副本加起来,总共有两套
- first-0
- first-1