RabbitMQ
docker环境下rabbitmq的安装
查找镜像
- docker search rabbitmq:3-management
拉取镜像
- docker pull rabbitmq:3-management
生成容器,rabbitmq暴露两个端口,其中15672是我们的后台管理界面端口
- docker run -t -p 5672:5672 -p 15672:15672 -d --name rabbitmq rabbitmq:3-management
成功运行后浏览器中输入: localhost:15672
默认用户名为:guest,
默认密码为:guest,
其中我们主要关注exchanges和queues两个选项
rabbitmq简介
- rabbitmq的简单工作方式:
- exchanges称作交换机,我们从程序中发送消息先到达exchanges中,
- 然后交换机根据匹配规则把消息发送到队列中
- 队列中的消息等待程序把他取出来
- rabbitmq可以有很多个交换机,可以有很多个队列,每个交换机可以给很多个消息队列发送消息,每个消息队列可以从多个交换机中取消息
匹配规则常用的有三种:
-
Direct: 发送给路由件相同的队列
-
Fanout: 发送给全部的消息队列
-
Topic: 发送给路由件匹配的队列。路由件模糊匹配规则 #.xx ( # 匹配任意可以为0, * 匹配一个单词 )
-
路由件:routing key
创建exchange
在exchanges里面我们可以看到系统已经给我们默认建立了几个交换机
我们在下面可以建立交换机
起一个name,选择一个匹配规则,然后确定
创建queue
- 在queues中的下面可以创建队列
起一个名字,然后确定
设置交换机可以发送消息的队列
我们点开一个exchange
- 在下面有 Add binding from this exchange
- 填上绑定的队列的名字以及路由件,路由件就是匹配规则,如果交换机是fanout就不用填了,因为他只要是绑定的队列就会发送消息
java代码发送消息
maven jar:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- springboot项目中:
- application配置中配置rabbitmq
rabbitmq:
addresses: localhost
username: guest
password: guest
port: 5672
- 建一个rabbitmq配置类,把发送信息的格式改为jason格式
@Configuration
public class MyAMQConfig {
//消息序列化规则改为json
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
我们主要是通过两个类使用rabbitmq
- RabbitmqTemplate: 控制发送接收消息
- AmqpAdmin:控制创建删除exchange和queue
创建一个book类用于测试
public class Book {
private String name;
private String auther;
public Book(String name,String auther){
this.name = name;
this.auther = auther;
}
public Book(){
}
@Override
public String toString() {
return "Book{" +
"name='" + name + '\'' +
", auther='" + auther + '\'' +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAuther() {
return auther;
}
public void setAuther(String auther) {
this.auther = auther;
}
}
需要自动注入使用的类:
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
AmqpAdmin amqpAdmin;
简化发送消息:
void contextLoads() {
// 参数:交换机,路由件,信息
//交换机有三种形式,direct,fanout,topic
//简化并发送
//rabbitTemplate.convertAndSend("exchange.direct","queue.news",map);
Book book = new Book("西游记","吴承恩");
rabbitTemplate.convertAndSend("exchange.fanout","",book);
}
三个参数:交换机,路由件,发送的信息
然后打开浏览器中的queues,点击接受了消息的队列,点击下面的get message
- 如果没有配置数据传输形式为jason,这里就会乱码
简化并接受信息
public void test2(){
//接收并简化
Object o = rabbitTemplate.receiveAndConvert("queue.news");
System.out.println(o.toString());
}
指定从哪个队列中接受信息
使用AmqpAdmin创建删除交换机和队列
public void create(){
/**
* declare (创建) 以他开头的都是创建的
*
* delete (删除)
*
*/
amqpAdmin.declareExchange(new DirectExchange("newDirect"));
amqpAdmin.declareQueue(new Queue("newQueue",true));
//参数:目标队列名字,绑定目标类型,交换机名字,路由件,参数(可以为null)
amqpAdmin.declareBinding(new Binding("newQueue",Binding.DestinationType.QUEUE,"newDirect","haha.news",null ));
}
使用注解接受消息
- 首先需要在application中开启rabbitmq注解识别
- 加入@EnableRabbitmq
@EnableRabbit
@SpringBootApplication
public class RabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}
}
创建一个服务层并使用@RabbitmqListener注解监听消息队列中的消息
@Service
public class BookService {
//消息体 book 一定要写无参构造函数 否则出错
@RabbitListener(queues = "queue.news")
public void getBook(Book book){
System.out.println("收到消息"+book.toString());
}
}
- 注解的参数中可以放多个队列,可以同时监听多个队列
- 消息的类中一定要有无参构造函数, ---之前在这里踩过坑---
- 发送消息是向交换机中发送,监听的是具体的队列
Kafka
安装
docekr-compose安装:
docker-compose安装:
sudo curl -L https://get.daocloud.io/docker/compose/releases/download/1.25.1/docker-compose-
uname -s
-uname -m
-o /usr/local/bin/docker-compose
添加执行权限:
sudo chmod +x /usr/local/bin/docker-compose
zookeeper集群
zookeeper.yml:
version: '3.4'
services:
zoo1:
image: zookeeper
restart: always
hostname: zoo1
container_name: zoo1
ports:
- 2001:2181
volumes:
- "/root/zookeeper/zoo1/data:/data"
- "/root/zookeeper/zoo1/datalog:/datalog"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
net1:
ipv4_address: 192.168.1.1
zoo2:
image: zookeeper
restart: always
hostname: zoo2
container_name: zoo2
ports:
- 2002:2181
volumes:
- "/root/zookeeper/zoo2/data:/data"
- "/root/zookeeper/zoo2/datalog:/datalog"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888
networks:
net1:
ipv4_address: 192.168.1.2
zoo3:
image: zookeeper
restart: always
hostname: zoo3
container_name: zoo3
ports:
- 2003:2181
volumes:
- "/root/zookeeper/zoo3/data:/data"
- "/root/zookeeper/zoo3/datalog:/datalog"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
networks:
net1:
ipv4_address: 192.168.1.3
networks:
net1:
external:
name: net1
执行:
docker-compose -f zookeeper.yml up -d
kafka集群
配置文件:
version: '2'
services:
kafka1:
image: wurstmeister/kafka
hostname: kafka1
container_name: kafka1
ports:
- 2011:2011
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_ADVERTISED_PORT: 2011
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
volumes:
- /root/kafka/kafka1/logs:/kafka
external_links:
- zoo1
- zoo2
- zoo3
networks:
net1:
ipv4_address: 192.168.1.11
kafka2:
image: wurstmeister/kafka
hostname: kafka2
container_name: kafka2
ports:
- 2012:2012
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_ADVERTISED_PORT: 2012
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
volumes:
- /root/kafka/kafka2/logs:/kafka
external_links:
- zoo1
- zoo2
- zoo3
networks:
net1:
ipv4_address: 192.168.1.12
kafka3:
image: wurstmeister/kafka
hostname: kafka3
container_name: kafka3
ports:
- 2013:2013
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_PORT: 2013
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
volumes:
- /root/kafka/kafka3/logs:/kafka
external_links:
- zoo1
- zoo2
- zoo3
networks:
net1:
ipv4_address: 192.168.1.13
networks:
net1:
external:
name: net1
执行:
docker-compose -f kafka.yml up -d