消息队列

Java / 2020-02-14

RabbitMQ

docker环境下rabbitmq的安装

查找镜像

  • docker search rabbitmq:3-management

拉取镜像

  • docker pull rabbitmq:3-management

生成容器,rabbitmq暴露两个端口,其中15672是我们的后台管理界面端口

  • docker run -t -p 5672:5672 -p 15672:15672 -d --name rabbitmq rabbitmq:3-management

成功运行后浏览器中输入: localhost:15672

默认用户名为:guest,

默认密码为:guest,

截屏2020-02-14下午10.14.37-6441f1db465247148ba3ec0fc9cc46f1

其中我们主要关注exchanges和queues两个选项

rabbitmq简介

  • rabbitmq的简单工作方式:
  • exchanges称作交换机,我们从程序中发送消息先到达exchanges中,
  • 然后交换机根据匹配规则把消息发送到队列中
  • 队列中的消息等待程序把他取出来
  • rabbitmq可以有很多个交换机,可以有很多个队列,每个交换机可以给很多个消息队列发送消息,每个消息队列可以从多个交换机中取消息

匹配规则常用的有三种:

  • Direct: 发送给路由件相同的队列

  • Fanout: 发送给全部的消息队列

  • Topic: 发送给路由件匹配的队列。路由件模糊匹配规则 #.xx ( # 匹配任意可以为0, * 匹配一个单词 )

  • 路由件:routing key

创建exchange

在exchanges里面我们可以看到系统已经给我们默认建立了几个交换机
我们在下面可以建立交换机

截屏2020-02-14下午10.25.05-0255260bba4646fea1d9a389a6bc9b90

起一个name,选择一个匹配规则,然后确定

创建queue

  • 在queues中的下面可以创建队列

截屏2020-02-14下午10.28.47-b4f7f22da7614f58bd8f4612f2eaa450

起一个名字,然后确定

设置交换机可以发送消息的队列

我们点开一个exchange

截屏2020-02-14下午10.31.37-aae24b1153a2450e9ef7a51c0dee21d1

  • 在下面有 Add binding from this exchange
  • 填上绑定的队列的名字以及路由件,路由件就是匹配规则,如果交换机是fanout就不用填了,因为他只要是绑定的队列就会发送消息

java代码发送消息

maven jar:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

  • springboot项目中:
  • application配置中配置rabbitmq
  rabbitmq:
    addresses: localhost
    username: guest
    password: guest
    port: 5672

  • 建一个rabbitmq配置类,把发送信息的格式改为jason格式
@Configuration
public class MyAMQConfig {
    
   //消息序列化规则改为json
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}


我们主要是通过两个类使用rabbitmq

  • RabbitmqTemplate: 控制发送接收消息
  • AmqpAdmin:控制创建删除exchange和queue
    创建一个book类用于测试
public class Book {

    private String name;

    private String auther;

    public Book(String name,String auther){
        this.name = name;
        this.auther = auther;
    }

    public Book(){

    }


    @Override
    public String toString() {
        return "Book{" +
                "name='" + name + '\'' +
                ", auther='" + auther + '\'' +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAuther() {
        return auther;
    }

    public void setAuther(String auther) {
        this.auther = auther;
    }
}


需要自动注入使用的类:

	@Autowired
	RabbitTemplate rabbitTemplate;


	@Autowired
	AmqpAdmin amqpAdmin;



简化发送消息:

void contextLoads() {
		// 参数:交换机,路由件,信息
		//交换机有三种形式,direct,fanout,topic

		//简化并发送
		//rabbitTemplate.convertAndSend("exchange.direct","queue.news",map);

		Book book = new Book("西游记","吴承恩");
		rabbitTemplate.convertAndSend("exchange.fanout","",book);
	}

三个参数:交换机,路由件,发送的信息
然后打开浏览器中的queues,点击接受了消息的队列,点击下面的get message

截屏2020-02-14下午10.46.12-0c816bb4b086469dab29dc89c670f977

  • 如果没有配置数据传输形式为jason,这里就会乱码

简化并接受信息

public void test2(){

		//接收并简化
		Object o = rabbitTemplate.receiveAndConvert("queue.news");
		System.out.println(o.toString());
	}

指定从哪个队列中接受信息

使用AmqpAdmin创建删除交换机和队列


public void create(){
		/**
		 * declare (创建) 以他开头的都是创建的
		 *
		 * delete  (删除)
		 *
		 */

		amqpAdmin.declareExchange(new DirectExchange("newDirect"));


		amqpAdmin.declareQueue(new Queue("newQueue",true));

		//参数:目标队列名字,绑定目标类型,交换机名字,路由件,参数(可以为null)
		amqpAdmin.declareBinding(new Binding("newQueue",Binding.DestinationType.QUEUE,"newDirect","haha.news",null ));
	}


使用注解接受消息

  • 首先需要在application中开启rabbitmq注解识别
  • 加入@EnableRabbitmq
@EnableRabbit
@SpringBootApplication
public class RabbitmqApplication {

	public static void main(String[] args) {
		SpringApplication.run(RabbitmqApplication.class, args);
	}

}


创建一个服务层并使用@RabbitmqListener注解监听消息队列中的消息

@Service
public class BookService {

    //消息体 book 一定要写无参构造函数 否则出错
    @RabbitListener(queues = "queue.news")
    public void getBook(Book book){

        System.out.println("收到消息"+book.toString());
    }
}


截屏2020-02-14下午10.58.04-dfd615062bd147bdbf2d0a3e81239d14

  • 注解的参数中可以放多个队列,可以同时监听多个队列
  • 消息的类中一定要有无参构造函数, ---之前在这里踩过坑---
  • 发送消息是向交换机中发送,监听的是具体的队列

Kafka

安装

docekr-compose安装:

docker-compose安装:

sudo curl -L https://get.daocloud.io/docker/compose/releases/download/1.25.1/docker-compose-uname -s-uname -m -o /usr/local/bin/docker-compose

添加执行权限:

sudo chmod +x /usr/local/bin/docker-compose

zookeeper集群

zookeeper.yml:

version: '3.4'

services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    container_name: zoo1
    ports:
    - 2001:2181
    volumes:
    - "/root/zookeeper/zoo1/data:/data"
    - "/root/zookeeper/zoo1/datalog:/datalog"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    networks:
      net1:
        ipv4_address: 192.168.1.1

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    container_name: zoo2
    ports:
    - 2002:2181
    volumes:
    - "/root/zookeeper/zoo2/data:/data"
    - "/root/zookeeper/zoo2/datalog:/datalog"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888
    networks:
      net1:
        ipv4_address: 192.168.1.2

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    container_name: zoo3
    ports:
    - 2003:2181
    volumes:
    - "/root/zookeeper/zoo3/data:/data"
    - "/root/zookeeper/zoo3/datalog:/datalog"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
    networks:
      net1:
        ipv4_address: 192.168.1.3

networks:
  net1:
    external:
      name: net1

执行:

docker-compose -f zookeeper.yml up -d

kafka集群

配置文件:

version: '2'

services:
  kafka1:
    image: wurstmeister/kafka
    hostname: kafka1
    container_name: kafka1
    ports:
    - 2011:2011
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka1
      KAFKA_ADVERTISED_PORT: 2011
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    volumes:
    - /root/kafka/kafka1/logs:/kafka
    external_links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      net1:
        ipv4_address: 192.168.1.11

  kafka2:
    image: wurstmeister/kafka
    hostname: kafka2
    container_name: kafka2
    ports:
    - 2012:2012
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka2
      KAFKA_ADVERTISED_PORT: 2012
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    volumes:
    - /root/kafka/kafka2/logs:/kafka
    external_links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      net1:
        ipv4_address: 192.168.1.12

  kafka3:
    image: wurstmeister/kafka
    hostname: kafka3
    container_name: kafka3
    ports:
    - 2013:2013
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka3
      KAFKA_ADVERTISED_PORT: 2013
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    volumes:
    - /root/kafka/kafka3/logs:/kafka
    external_links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      net1:
        ipv4_address: 192.168.1.13

networks:
  net1:
    external:
      name: net1

执行:

docker-compose -f kafka.yml up -d