kafka
特性
- 高并发:数千个客户端同时读写
- 可扩展:kafka集群支持热扩展
- 容错:允许集群中节点失败
- 持久可靠:消息持久化到本地磁盘
- 高吞吐,低延迟:每秒处理几十万消息,消息主题可以多分区
使用场景
- 日志收集
- 消息系统:解耦生产者和消费者
- 用户活动跟踪
- 运营指标:记录运营监控数据
基本概念
- Broker:消息中间件处理节点,一个kafka就是一个broker,一个或者多个broker组成一个集群
- Topic:一个topic可以有多个分区
- Partition:分区可以分布在集群的不同节点上,消息以追加的方式写入一个或者多个分区
- LogSegment:每个分区又被划分为多个日志分段 LogSegment 组成,日志段是 Kafka 日志对象分片的最小单位。LogSegment 算是一个逻辑概念,对应一个具体的日志文件(”.log” 的数据文件)和两个索引文件(”.index” 和 “.timeindex”,分别表示偏移量索引文件和消息时间戳索引文件)组成
- Offset:消息有序的到达partition,offset标记每个消息的偏移量
- producer:默认消息发布到主题的所有分区上,也可以自定义分区路由
- consumer:消费者会把最后读取的消息的offset放到zookeeper中,读取状态不会消失
- group:每个consumer属于一个group,可以多个group消费一个topic,
下载,解压
配置
config/server.properties
#当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=1
#当前kafka对外提供服务的端口默认是9092
port=19092
#这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
host.name=192.168.1.224
#这个是borker进行网络处理的线程数
num.network.threads=3
#这个是borker进行I/O处理的线程数
num.io.threads=8
#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
log.dirs=/usr/local/kafka/kafka_2.11-0.9.0.1/kafka_log
#发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400
#kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400
#这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=104857600
#默认的分区数,一个topic默认1个分区数
num.partitions=1
#默认消息的最大持久化时间,168小时,7天
log.retention.hours=168
#消息保存的最大值5M
message.max.byte=5242880
#kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
default.replication.factor=2
#取消息的最大直接数
replica.fetch.max.bytes=5242880
#这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.segment.bytes=1073741824
#每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.retention.check.interval.ms=300000
#是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false
#设置zookeeper的连接端口
zookeeper.connect=192.168.1.224:2181,192.168.1.225:2181,192.168.1.226:1218
启动
bin/kafka-server-start.sh config/server.properties
停止
bin/kafka-server-top.sh
删除主题
bin/kafka-topics.sh --delete --topic test --zookeeper localhost:2181
多线程向kafka中写入100w条个数,1000000-1999999,写到10个分区,个位数相同的写到同一个分区,10个线程并行消费,顺序输出数据。
kafka provider配置
/**
* topic
*/
private static final String KAFKA_TOPIC_NAME = "order_test";
/**
* 主题分区数量
*/
private static final Integer NUM_PARTITIONS = 10;
/**
* 地址
*/
private static final String BOOTStRAP_SERVERS_CONFIG = "localhost:9092";
/**
* 新建主题
* @return
*/
@Bean
public NewTopic newTopic(){
return new NewTopic(KAFKA_TOPIC_NAME, NUM_PARTITIONS, (short) 1);
}
@Bean
public KafkaAdmin kafkaAdmin(){
Map<String, Object> config = new HashMap<>(2);
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTStRAP_SERVERS_CONFIG);
return new KafkaAdmin(config);
}
/**
* Producer Template 配置
*/
@Bean(name="kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* Producer 工厂配置
*/
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* Producer 参数配置
*/
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(20);
// 指定多个kafka集群多个地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTStRAP_SERVERS_CONFIG);
// 重试次数,0为不启用重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 0);
// acks=0 把消息发送到kafka就认为发送成功
// acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
// acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
props.put(ProducerConfig.ACKS_CONFIG,"1");
// 生产者空间不足时,send()被阻塞的时间,默认60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
// 控制批处理大小,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
// 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
// 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
// 键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
// 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
return props;
}
Kafka consumer 配置
/**
* 地址
*/
private static final String bootstrapServers = "localhost:9092";
/**
* 消费端配置
* @return
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// 地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// group id
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
// 一次poll的record最大数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 开启多线程
factory.setBatchListener(true);
// 设置10个线程消费
factory.setConcurrency(10);
return factory;
}
provider
/**
* 线程池默认线程数量
*/
public static final Integer CORE_POOL_SIZE = 15;
/**
* 线程池最大线程数量
*/
public static final Integer MAX_POOL_SIZE = 15;
/**
* 大于核心线程数量的线程闲置到摧毁的时间 1s
*/
public static final Integer KEEP_ALIVE_TIME = 1;
public static final Integer QUEUE_SIZE = 1000000;
/**
* 向kafka发送数据的起始数据
*/
public static final Integer START_NUM = 1000000;
/**
* 向kafka发送数据的截止数据
*/
public static final Integer END_NUM = 2000000;
/**
* topic
*/
public static final String KAFKA_TOPIC_NAME = "order_test";
@Autowired
private KafkaTemplate kafkaTemplate;
@Async
public void provider(){
// 初始化线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_SIZE),
new ThreadPoolExecutor.AbortPolicy()
);
// 发送数据
for (int i=START_NUM; i<END_NUM; i++){
int finalI = i;
threadPoolExecutor.execute(() -> {
kafkaTemplate.send(KAFKA_TOPIC_NAME, finalI %10, String.valueOf(finalI), String.valueOf(finalI));
});
}
threadPoolExecutor.shutdown();
}
consumer
/**
* topic
*/
private static final String BATCH_TOPIC = "order_test";
/**
* kafka监听, 根据配置的监听器10个线程并发监听
* @param data
* @param partitions
* @param offsets
*/
@KafkaListener(topics = BATCH_TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void receivePartitions(List<String> data,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
// 遍历每次poll到的record
for (int i = 0; i < data.size(); i++) {
Long threadId = Thread.currentThread().getId();
System.out.println("线程:" + threadId + " ,分区:" + partitions.get(i) + " ,value:" + data.get(i));
}
}