书接上回:RabbitMq概述与工作模式深度剖析_bingtanghulu_6的博客-CSDN博客
工作模式代码已经发到gitee仓库,需要的自取。代码路径:https://gitee.com/GengHongBo/rabbit-mqtest.git
1. Publish/Subscribe(发布-订阅模式)
发布订阅模式对象:发布者,交换机,队列,消费者1,消费者2。多个队列绑定交换机,交换机会给每个队列发送消息。
使用场景:up主跟粉丝订阅消息的场景;天气预报跟百度,微博等天气专栏的关系。

1.1 发布-订阅模式-实战创建
为了节省时间只放了一些关键代码和截图,以后的都是这样。需要的可以去开头的git仓库自取。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
import java.util.Scanner;
/**
* 发布订阅模式:新建一个TCP长连接绑定交换机
*/
public class ExchangeTest {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
String str = new Scanner(System.in).next();
//绑定交换机
//RabbitMQConstant.EXCHANGE_PUBSUB:交换机;queueName:队列名称;null:额外的设置属性;消息内容
channel.basicPublish(RabbitMQConstant.EXCHANGE_PUBSUB,"",null,str.getBytes());
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
/**
* 消费者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//设置连接信息
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQConstant.QUEUE_PUBSUB,false,false,false,null);
//队列绑定交换机
//RabbitMQConstant.QUEUE_PUBSUB:队列名称;RabbitMQConstant.EXCHANGE_PUBSUB:交换机名称;"":路由key(暂时用不到)
channel.queueBind(RabbitMQConstant.QUEUE_PUBSUB,RabbitMQConstant.EXCHANGE_PUBSUB,"");
channel.basicQos(1);//消费完一个消息再去队列拿其他的消息
//消费消息
//"":交换机,helloworld模式不使用;queueName:队列名称;null:额外的设置属性;消息内容
channel.basicConsume(RabbitMQConstant.QUEUE_PUBSUB,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}


2.routing(路由模式)
路由模式对象:生产者,交换机,路由key,队列,消费者1,消费2。。。,通过队列绑定消费者并且指定唯一路由key的形式指定消费固定路由key的消息。

2.1 routing实战
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.util.*;
/**
* 路由模式:新建一个TCP长连接绑定交换机
*/
public class ExchangeTest {
public static void main(String[] args) throws Exception {
HashMap map = new HashMap();
map.put("key1","test1");
map.put("key2","test2");
map.put("key3","test3");
map.put("key4","test4");
map.put("key5","test5");
map.put("key6","test6");
map.put("key7","test7");
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
Iterator> iterator = map.entrySet().iterator();
while(iterator.hasNext()){
Map.Entry entry = iterator.next();
//绑定交换机
//RabbitMQConstant.EXCHANGE_PUBSUB:交换机;queueName:队列名称;null:额外的设置属性;消息内容
channel.basicPublish(RabbitMQConstant.EXCHANGE_ROUTING,entry.getKey(),null,entry.getKey().getBytes());
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
/**
* 消费者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//设置连接信息
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQConstant.QUEUE_ROUTING1,false,false,false,null);
//队列绑定交换机
//RabbitMQConstant.QUEUE_PUBSUB:队列名称;RabbitMQConstant.EXCHANGE_PUBSUB:交换机名称;"":路由key(暂时用不到)
channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_ROUTING,"key1");
channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_ROUTING,"key2");
channel.basicQos(1);//消费完一个消息再去队列拿其他的消息
//消费消息
//"":交换机,helloworld模式不使用;queueName:队列名称;null:额外的设置属性;消息内容
channel.basicConsume(RabbitMQConstant.QUEUE_ROUTING1,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1接收到的消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}


3. Topics-通配符模式
通配符模式:与上面的类似。
应用场景:当routingkey过多时,会出现很多类似的代码,使用通配符模式可以减少代码量。#代表一个或多个词,*代表一个词。

3.1 topic-通配符模式实战
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* 通配符模式:新建一个TCP长连接绑定交换机
*/
public class ExchangeTest {
public static void main(String[] args) throws Exception {
HashMap map = new HashMap();
map.put("key.key1.key.key","test1");
map.put("key.key2.key.key","test2");
map.put("key.key3.key.key","test3");
map.put("com.test.key","com.test1");
map.put("com.test.key1","com.test2");
map.put("com.test.key2","com.test3");
map.put("com.test.key3","com.test4");
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
Iterator> iterator = map.entrySet().iterator();
while(iterator.hasNext()){
Map.Entry entry = iterator.next();
//绑定交换机
//RabbitMQConstant.EXCHANGE_PUBSUB:交换机;queueName:队列名称;null:额外的设置属性;消息内容
channel.basicPublish(RabbitMQConstant.EXCHANGE_TOPICS,entry.getKey(),null,entry.getKey().getBytes());
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
/**
* 消费者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//设置连接信息
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQConstant.QUEUE_ROUTING1,false,false,false,null);
//队列绑定交换机
//RabbitMQConstant.QUEUE_PUBSUB:队列名称;RabbitMQConstant.EXCHANGE_PUBSUB:交换机名称;"":路由key(暂时用不到)
channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_TOPICS,"key.#");
channel.basicQos(1);//消费完一个消息再去队列拿其他的消息
//消费消息
//"":交换机,helloworld模式不使用;queueName:队列名称;null:额外的设置属性;消息内容
channel.basicConsume(RabbitMQConstant.QUEUE_ROUTING1,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1接收到的消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}

4. rabbitMq消息确认机制
当生产者传送消息到broker以后会出现两种情况:confirm和return。confirm是确定有队列的情况下,消费者确认或者不确认的情况。return是消息到达broker以后找不到队列而发生退回的情况。
4.1 消息确认机制-实战
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* 消息确认机制案例
*/
public class ConfirmTest {
public static void main(String[] args) throws Exception {
HashMap map = new HashMap();
map.put("key.key1.key.key","test1");
map.put("key.key2.key.key","test2");
map.put("key.key3.key.key","test3");
map.put("com.test.key","com.test1");
map.put("com.test.key1","com.test2");
map.put("com.test.key2","com.test3");
map.put("com.test.key3","com.test4");
map.put("erro.key","com.erro");
map.put("erro.key1","com.erro1");
map.put("erro.key3","com.erro2");
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//开启监听机制
channel.confirmSelect();
//确认监听机制
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {//确认返回
System.out.println("接受到消息,参数:"+l+",b:"+b);
}
@Override
public void handleNack(long l, boolean b) throws IOException {//拒绝消息
System.out.println("接受到消息拒收消息,参数:"+l+",b:"+b);
}
});
//返回监听机制
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return r) {
System.err.println("===========================");
System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
System.err.println("Return主题:" + new String(r.getBody()));
System.err.println("===========================");
}
});
Iterator> iterator = map.entrySet().iterator();
while(iterator.hasNext()){
Map.Entry entry = iterator.next();
//绑定交换机
//RabbitMQConstant.EXCHANGE_PUBSUB:交换机;queueName:队列名称;null:额外的设置属性;消息内容
channel.basicPublish(RabbitMQConstant.EXCHANGE_CONFIRM,entry.getKey(),null,entry.getKey().getBytes());
}
//在消息确认机制下我们不能关闭长连接
// channel.close();
// connection.close();
}
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
/**
* 消费者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//设置连接信息
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQConstant.QUEUE_CONFIRM1,false,false,false,null);
//队列绑定交换机
//RabbitMQConstant.QUEUE_PUBSUB:队列名称;RabbitMQConstant.EXCHANGE_PUBSUB:交换机名称;"":路由key(暂时用不到)
channel.queueBind(RabbitMQConstant.QUEUE_CONFIRM1,RabbitMQConstant.EXCHANGE_CONFIRM,"key.#");
channel.basicQos(1);//消费完一个消息再去队列拿其他的消息
//消费消息
//"":交换机,helloworld模式不使用;queueName:队列名称;null:额外的设置属性;消息内容
channel.basicConsume(RabbitMQConstant.QUEUE_CONFIRM1,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1接收到的消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
5.spring整合rabbitMQ
分两个工程:生产者(spring-rabbit-consumerTest),消费者(spring-rabbit-consumerTest)
生产者:添加pom.xml依赖,配置文件整合,测试案例编写
消费者:添加pom.xml依赖,配置文件整合,增加监听器,启动监听
