028-86922220

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

KafkaTopicshell操作+基准测试+javaAPI-创新互联

Kafka Topic shell操作+基准测试+java API1 消息队列的基本介绍 1.1 消息队列产生背景

什么是消息队列呢?

消息: 数据 只不过这个数据具有一种流动状态
队列: 存储数据的容器 只不过这个容器具有FIFO(先进先出)特性

消息队列: 数据在队列中, 从队列的一端传递到另一端的过程, 数据在整个队列中产生一种流动状态
1.2 常见的消息队列的产品

常见的消息队列的产品:

1.3 消息队列的作用是什么1.4 消息队列的两种消费模型2 Kafka的基本介绍

​ Kafka是Apache旗下的一款开源免费的消息队列的中间件产品,最早是由领英公司开发的, 后期共享给Apache, 目前已经是Apache旗下的顶级开源的项目, 采用语言为Scala

​ 官方网站: http://www.kafka.apache.org

适用场景: 数据传递工作, 需要将数据从一端传递到另一端, 此时可以通过Kafka来实现, 不局限两端的程序

​ 在实时领域中, 主要是用于流式的数据处理工作

3 Kafka的架构
Kafka Cluster: kafka集群
broker: kafka的节点
producer: 生产者
consumer: 消费者
Topic: 主题/话题 理解就是一个大的逻辑容器(管道)
	shard: 分片. 一个Topic可以被分为N多个分片, 分片的数量与节点数据没有关系
	replicas: 副本, 可以对每一个分片构建多个副本, 副本数量最多和节点数量一致(包含本身) 保证数据不丢失
zookeeper: 存储管理集群的元数据信息
4 Kafka的安装操作

参考Kafka的集群安装文档 完成整个安装工作即可

如果安装后, 无法启动, 可能遇到的问题:

1) 配置文件中忘记修改broker id 
2) 忘记修改监听的地址, 或者修改了但是前面的注释没有打开

如何启动Kafka集群:

启动zookeeper集群: 每个节点都要执行
cd /export/server/zookeeper/bin
./zkServer.sh start

启动完成后 需要通过 jps -m 查看是否启动 , 并且还需要通过:
./zkServer.sh status 查看状态, 必须看到一个leader 两个follower才认为启动成功了

启动Kafka集群: 

单节点: 每个节点都需要执行
cd /export/server/kafka_2.12-2.4.1/bin
前台启动:
	./kafka-server-start.sh ../config/server.properties
后台启动:
	nohup ./kafka-server-start.sh ../config/server.properties 2>&1 &
注意: 第一次启动, 建议先前台启动, 观察是否可以正常启动, 如果OK, ctrl +C 退出, 然后挂载到后台

如何停止:

单节点: 每个节点都需要执行
cd /export/server/kafka_2.12-2.4.1/bin
操作:
	jps 然后通过 kill -9
	或者:
	./kafka-server-stop.sh

配置一键化脚本: 仅用于启动Kafka 不会启动zookeeper, zookeeper还是需要单独启动, 或者配置zookeeper的一键化脚本

mkdir -p /export/onekey
cd /export/onekey/
上传即可
cd /export/onekey/
chmod 755 *.sh
5 Kafka的相关使用

​ Kafka是一个消息队列的中间件产品, 主要的作用: 将数据从程序一端传递到另一端的操作, 所以说学习Kafka主要学习如何使用Kafka生产数据, 以及如何使用Kafka消费数据

5.1 Kafka的shell命令使用
./kafka-topics.sh  --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 3 --replication-factor 2
./kafka-topics.sh  --list  --zookeeper node1:2181,node2:2181,node3:2181
./kafka-topics.sh --describe --zookeeper  node1:2181,node2:2181,node3:2181 --topic test01
./kafka-topics.sh --delete --zookeeper node1:2181,node2:2181,node3:2181 --topic test01
注意: 
	默认情况下, 删除一个topic 仅仅是标记删除, 主要原因: Kafka担心直接删除, 会导致误删数据
	
	如果想执行删除的时候, 直接将topic完整的删除掉: 此时需要在server.properties配置中修改一下配置为True
		delete.topic.enable=true
	
	如果本身Topic中的数据量非常少, 或者没有任何的使用, 此时Topic会自动先执行逻辑删除, 然后在物理删除, 不管是否配置delete.topic.enable
Topic 仅允许增大分片, 不允许减少分片 同时也不支持修改副本的数量

增大分区:
./kafka-topics.sh  --alter --zookeeper node1:2181,node2:2181,node3:2181 --topic test01  --partitions 5
./kafka-console-producer.sh  --broker-list node1:9092,node2:9092,node3:9092 --topic test01
>
./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test01

默认从当前的时间开始消费数据, 如果想从头开始消费, 可以添加 --from-beginning  参数即可
5.2 Kafka的基准测试

​ Kafka的基准测试: 主要指的是将安装完成的Kafka集群, 进行测试操作, 测试其能否承载多大的并发量(读写的效率)

​ 注意: 在进行Kafka的基准测试的时候, 受Topic的分片和副本的数量影响会比较大, 一般在测试的时候, 会构建多个topic, 每一个topic设置不同的分片和副本的数量, 比如: 一个设置分片多一些, 副本多一些 一个设置分片多一些, 副本少些…

./kafka-topics.sh  --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test02 --partitions 6 --replication-factor 1
cd /export/server/kafka/bin
./kafka-producer-perf-test.sh --topic test02 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1

属性说明:
	--num-records : 发送的总消息量
	--throughput : 指定吞吐量(限流)  -1 不限制
	--record-size: 每条数据的大小(字节)
    --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1 : 设置kafka的地址和消息发送模式
cd /export/server/kafka/bin
./kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test02 --fetch-size 1048576 --messages 5000000

属性说明:
	--fetch-size : 每次从Kafka端拉取数据量
	--message: 测试的总消息量
假设Kafka的节点数量是无限多的:
	topic分片数量越多, 理论上读写效率越高
	topic副本数量越多, 整体执行效率越差

一般可以将分片的数量设置为副本数量的三倍左右 可以测试出比较最佳的性能  副本调整为1
5.3 Kafka的Java API的操作
aliyunhttp://maven.aliyun.com/nexus/content/groups/public/true false neverorg.apache.kafkakafka-clients2.4.1org.apache.commonscommons-io1.3.2org.slf4jslf4j-log4j121.7.6log4jlog4j1.2.16 org.apache.maven.plugins maven-compiler-plugin 3.1  1.8 1.8 
5.3.1 演示如何将数据生产到Kafka
package com.itheima.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerTest {public static void main(String[] args) {// 第一步: 创建kafka的生产者核心对象: KafkaProducer  传入相关的配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producerproducer = new KafkaProducer<>(props);

        //2. 执行发送数据操作
        for (int i = 0; i< 10; i++) {ProducerRecordproducerRecord = new ProducerRecord<>(
                    "test01", "张三"+i
            );
            producer.send(producerRecord);
        }

        //3. 执行close 释放资源
        producer.close();

    }
}
5.3.2 演示如何从Kafka消费数据
package com.itheima.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerTest {public static void main(String[] args) {// 1- 创建Kafka的消费者的核心对象: KafkaConsumer
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("group.id", "test"); // 消费者组的ID
        props.put("enable.auto.commit", "true"); // 是否自动提交偏移量offset
        props.put("auto.commit.interval.ms", "1000"); // 自动提交的间隔时间
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key值的反序列化的类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的值反序列化的类

        KafkaConsumerconsumer = new KafkaConsumer<>(props);
        //2. 订阅topic: 表示消费者从那个topic来消费数据  可以指定多个
        consumer.subscribe(Arrays.asList("test01"));

        while (true) {// 3. 从kafka中获取消息数据, 参数表示当kafka中没有消息的时候, 等待的超时时间, 如果过了等待的时间, 返回空对象(对象存在, 但是内部没有数据  相当于空容器)
            ConsumerRecordsrecords = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecordrecord : records) {long offset = record.offset();
                String key = record.key();
                String value = record.value();
                // 偏移量: 每一条数据 其实就是一个偏移量 , 每个分片单独统计消息到达了第几个偏移量 偏移量从 0 开始的
                System.out.println("消息的偏移量为:"+offset+"; key值为:"+key + "; value的值为:"+ value);
            }
        }

    }

}

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


分享标题:KafkaTopicshell操作+基准测试+javaAPI-创新互联
网页网址:http://www.tsicrk.com/article/deoicp.html

其他资讯

让你的专属顾问为你服务

2.8727s