028-86922220

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

kafka精通1-创新互联

学习目标: 学习内容: 为什么使用消息队列(MQ)消息队列的流派

目前消息队列的中间件选型有很多种:

这些消息队列中间件有什么区别?

  1. 有broker
    重topic:kafka、rocketMQ
    整个topic,依据topic来进行消息的中转,在重topic的消息队列里必然需要topic的存在
    轻topic:RabbitMQ
    topic只是一种中转模式
  2. 无broker
    在生产者和消费者之间没有使用broker,例如zeroMQ,直接使用socket来进行通信
kafka介绍

kafka是一个分布式、支持分区(partition)、多副本(replica),基于zookeeper协调的分布式消息系统,大特点是可以实时的处理大量数据

kafka使用场景kafka的基本知识
  1. kafka的安装

    1.部署一台zookeeper服务器
    	2.安装jdk
    	3.下载kafka的安装包:https:kafka.apache.org/download
    	4.上传到kafka服务器上并解压:/usr/local/kafka
    	5.	进入conf目录内,修改server.properties
    
    	```powershell
    	#broker.id属性在kafka集群中必须要唯一
    	broker.id=0
    	#kafka部署的机器ip和提供服务的端口
    	listeners=PLAINTEXT://10.234.252.122:9092
    	#kafka消息存储文件
    	log.dirs=/usr/local/kafka
    	#kafka连接zookeeper的地址
    	zookeeper.connect=10.234.252.122:2181
    	```	
    
    	```powershell
    	#进入到bin目录内,执行以下命令来启动kafka服务器(带着配置文件)
    	./kafka-server-start.sh -daemon ../config/server.properties
    
    	#校验kafka是否启动成功,进入到zk内查看是否有kafka的节点
    	ls /brokers/ids				#查询出有broker的id则存在
    	```
  2. 创建topic

执行以下命令创建名为“test”的topic,这个topic只有一个partition,并且备份因子也设置为1:
#./kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看当前kafka内有那些topic
./kafka-topic.sh --zookeeper localhost:2182 --list
  1. 发送消息
    kafka自带一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以从命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中,在默认情况下,每一行会被当成一个独立的消息,使用kafka的发送消息的客户端,指定发送到kafka服务器地址和topic
./kafka-console-producer.sh --broker-list 10.234.252.122:9092 --topic test
>hello
>world
>1111
>22222222
  1. 消费消息
    对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息,使用kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息
方式1:从最后一条消息的偏移量(offset)+1开始消费
#./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test

方式2:从开始消费
#./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test  --from-beginning
>hello
>world
>1111
>22222222
  1. 关于消费消息的细节
/usr/local/kafka/kafka-logs/主题-分区/000000.log

在这里插入图片描述

  1. 单播消息
    如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息;换言之,同一个消费组中只能有一个消费者收到一个topic中的消息
./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test  --consumer-property group.id=testgroup
  1. 多播消息
    不同的消费组订阅同一个topic,那么不同的消费组中只有一个消费者能收到消息;实际上也是多个消费组中的多个消费者收到同一个topic的消息
./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test  --consumer-property group.id=testgroup1

./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test  --consumer-property group.id=testgroup2
  1. 查看消费组及信息
查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 10.234.252.122:9092 --list

查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 10.234.252.122:9092 --describe --group testgroup

在这里插入图片描述

在这里插入图片描述
注意:

主题、分区概念
  1. 主题topic
    主题-topic在kafka中是一个逻辑的概念,kafka通过topic将消息进行分类,不同的topic会被订阅该topic的消费者消费。
    但是有一个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被保存到log日志文件中的,为了解决这个文件过大的问题,kafka提出来分区的概念

  2. partition分区
    通过partition将一个topic中的消息分区来存储,这样的好处:

分区的作用:

分布式存储
		可以并行写

在这里插入图片描述

**为一个主题创建多个分区**
./kafka-topics.sh --create --zookeeper localhost:2181  --partitions 2 --replication-factor 1 --topic test1
  1. kafka中消息日志文件中保存的内容

    00000.log:这个文件中保存的就是消息
    
    __consumer_offsets-49:kafka内部自己创建了 __consumer_offsets主题,包含了50个分区,这个主题用来存放消费者消费某个主题的偏移量,因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主的上报给kafka中的默认主题:__consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区
    		提交到那个分区:通过hash函数:hash(consumergroupid)%_consumer_offsets主题的分区数
    		提交到该主题中的内容是:key是consumergroupid+topic+分区号,value就是当前offsets的值
    		
    文件中保存的消息,默认保存7天,7天后消息会被删除
副本的概念

在创建主题时,除了指明主题的分区数以外,还指明了副本数
副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有一个副本作为leader,其他是follower
在这里插入图片描述

kafka集群消息的发送与消费
./kafka-console-producer.sh --broker-list 10.234.252.122:9092,10.234.252.209:9092,10.234.253.22:9092 --topic my-replicated-topic

./kafka-console-consumer.sh --bootstrap-server 10.234.252.122:9092,10.234.252.209:9092,10.234.253.22:9092 --from-beginning --topic my-replicated-topic

在这里插入图片描述

关于分区消费组消费者的细节

在这里插入图片描述

生产者的同步发送消息

在这里插入图片描述
如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数为3次

生产者的异步发消息

在这里插入图片描述
异步发送,生产者发完消息后就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback回调方法

生产者ACK的配置

在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞,那么集群什么时候返回ACK呢?此时ACK有三个配置:

其他一些细节:

在这里插入图片描述

消费者offset的自动提交和手动提交

在这里插入图片描述
消费者poll到消息后默认情况下,会自动向broker的_consumer_offsets主题提交当前主题-分区消费的偏移量

消费者poll消息的过程

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


网页题目:kafka精通1-创新互联
文章起源:http://www.tsicrk.com/article/doocps.html

其他资讯

让你的专属顾问为你服务

2.4565s