liujijiang

kafka

2021.03.03

kafka

官网

特性

  • 高并发:数千个客户端同时读写
  • 可扩展:kafka集群支持热扩展
  • 容错:允许集群中节点失败
  • 持久可靠:消息持久化到本地磁盘
  • 高吞吐,低延迟:每秒处理几十万消息,消息主题可以多分区

使用场景

  • 日志收集
  • 消息系统:解耦生产者和消费者
  • 用户活动跟踪
  • 运营指标:记录运营监控数据

基本概念

  • Broker:消息中间件处理节点,一个kafka就是一个broker,一个或者多个broker组成一个集群
  • Topic:一个topic可以有多个分区
  • Partition:分区可以分布在集群的不同节点上,消息以追加的方式写入一个或者多个分区
  • LogSegment:每个分区又被划分为多个日志分段 LogSegment 组成,日志段是 Kafka 日志对象分片的最小单位。LogSegment 算是一个逻辑概念,对应一个具体的日志文件(”.log” 的数据文件)和两个索引文件(”.index” 和 “.timeindex”,分别表示偏移量索引文件和消息时间戳索引文件)组成
  • Offset:消息有序的到达partition,offset标记每个消息的偏移量
  • producer:默认消息发布到主题的所有分区上,也可以自定义分区路由
  • consumer:消费者会把最后读取的消息的offset放到zookeeper中,读取状态不会消失
  • group:每个consumer属于一个group,可以多个group消费一个topic,

下载,解压

配置

config/server.properties

#当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=1
#当前kafka对外提供服务的端口默认是9092
port=19092 
#这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
host.name=192.168.1.224
#这个是borker进行网络处理的线程数
num.network.threads=3 
#这个是borker进行I/O处理的线程数
num.io.threads=8 
#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
log.dirs=/usr/local/kafka/kafka_2.11-0.9.0.1/kafka_log
#发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400 
#kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400 
#这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=104857600 
#默认的分区数,一个topic默认1个分区数
num.partitions=1 
#默认消息的最大持久化时间,168小时,7天
log.retention.hours=168 
#消息保存的最大值5M
message.max.byte=5242880 
#kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
default.replication.factor=2 
#取消息的最大直接数
replica.fetch.max.bytes=5242880 
#这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.segment.bytes=1073741824 
#每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.retention.check.interval.ms=300000 
#是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false 
#设置zookeeper的连接端口
zookeeper.connect=192.168.1.224:2181,192.168.1.225:2181,192.168.1.226:1218 

启动

bin/kafka-server-start.sh config/server.properties

停止

bin/kafka-server-top.sh

删除主题

bin/kafka-topics.sh --delete --topic test --zookeeper localhost:2181

多线程向kafka中写入100w条个数,1000000-1999999,写到10个分区,个位数相同的写到同一个分区,10个线程并行消费,顺序输出数据。

kafka provider配置

/**
 * topic
 */
private static final String KAFKA_TOPIC_NAME = "order_test";

/**
 * 主题分区数量
 */
private static final Integer NUM_PARTITIONS = 10;

/**
 * 地址
 */
private static final String BOOTStRAP_SERVERS_CONFIG = "localhost:9092";

/**
 * 新建主题
 * @return
 */
@Bean
public NewTopic newTopic(){
    return new NewTopic(KAFKA_TOPIC_NAME, NUM_PARTITIONS, (short) 1);
}

@Bean
public KafkaAdmin kafkaAdmin(){
    Map<String, Object> config = new HashMap<>(2);
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTStRAP_SERVERS_CONFIG);
    return new KafkaAdmin(config);
}

/**
 * Producer Template 配置
 */
@Bean(name="kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

/**
 * Producer 工厂配置
 */
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

/**
 * Producer 参数配置
 */
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>(20);
    // 指定多个kafka集群多个地址
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTStRAP_SERVERS_CONFIG);
    // 重试次数,0为不启用重试机制
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    // acks=0 把消息发送到kafka就认为发送成功
    // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
    // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
    props.put(ProducerConfig.ACKS_CONFIG,"1");
    // 生产者空间不足时,send()被阻塞的时间,默认60s
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
    // 控制批处理大小,单位为字节
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
    // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
    // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
    // 键的序列化方式
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 值的序列化方式
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
    // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
    return props;
}

Kafka consumer 配置

/**
 * 地址
 */
private static final String bootstrapServers = "localhost:9092";

/**
 * 消费端配置
 * @return
 */
@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    // 地址
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    // 序列化器
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // group id
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    // 一次poll的record最大数量
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // 开启多线程
    factory.setBatchListener(true);
    // 设置10个线程消费
    factory.setConcurrency(10);
    return factory;
}

provider

/**
 * 线程池默认线程数量
 */
public static final Integer CORE_POOL_SIZE = 15;

/**
 * 线程池最大线程数量
 */
public static final Integer MAX_POOL_SIZE = 15;

/**
 * 大于核心线程数量的线程闲置到摧毁的时间 1s
 */
public static final Integer KEEP_ALIVE_TIME = 1;
public static final Integer QUEUE_SIZE = 1000000;

/**
 * 向kafka发送数据的起始数据
 */
public static final Integer START_NUM = 1000000;

/**
 * 向kafka发送数据的截止数据
 */
public static final Integer END_NUM = 2000000;

/**
 * topic
 */
public static final String KAFKA_TOPIC_NAME = "order_test";

@Autowired
private KafkaTemplate kafkaTemplate;

@Async
public void provider(){
    // 初始化线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(QUEUE_SIZE),
            new ThreadPoolExecutor.AbortPolicy()
    );

    // 发送数据
    for (int i=START_NUM; i<END_NUM; i++){
        int finalI = i;
        threadPoolExecutor.execute(() -> {
            kafkaTemplate.send(KAFKA_TOPIC_NAME, finalI %10, String.valueOf(finalI), String.valueOf(finalI));
        });
    }

    threadPoolExecutor.shutdown();
}

consumer

/**
 * topic
 */
private static final String BATCH_TOPIC = "order_test";

/**
 * kafka监听, 根据配置的监听器10个线程并发监听
 * @param data
 * @param partitions
 * @param offsets
 */
@KafkaListener(topics = BATCH_TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void receivePartitions(List<String> data,
                              @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                              @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    // 遍历每次poll到的record
    for (int i = 0; i < data.size(); i++) {
        Long threadId = Thread.currentThread().getId();
        System.out.println("线程:" + threadId + " ,分区:" + partitions.get(i) + " ,value:" + data.get(i));
    }

}