学无先后,达者为师

网站首页 编程语言 正文

RabbitMq工作模式深度剖析与Spring整合MQ

作者:bingtanghulu_6 更新时间: 2022-05-11 编程语言

书接上回:RabbitMq概述与工作模式深度剖析_bingtanghulu_6的博客-CSDN博客

工作模式代码已经发到gitee仓库,需要的自取。代码路径:https://gitee.com/GengHongBo/rabbit-mqtest.git

1. Publish/Subscribe(发布-订阅模式)

发布订阅模式对象:发布者,交换机,队列,消费者1,消费者2。多个队列绑定交换机,交换机会给每个队列发送消息。

使用场景:up主跟粉丝订阅消息的场景;天气预报跟百度,微博等天气专栏的关系。

 1.1 发布-订阅模式-实战创建

为了节省时间只放了一些关键代码和截图,以后的都是这样。需要的可以去开头的git仓库自取。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;
import java.util.Scanner;

/**
 * 发布订阅模式:新建一个TCP长连接绑定交换机
 */
public class ExchangeTest {

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        String str = new Scanner(System.in).next();

        //绑定交换机
        //RabbitMQConstant.EXCHANGE_PUBSUB:交换机;queueName:队列名称;null:额外的设置属性;消息内容
        channel.basicPublish(RabbitMQConstant.EXCHANGE_PUBSUB,"",null,str.getBytes());

        channel.close();
        connection.close();


    }
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;

/**
 * 消费者
 */
public class Consumer1 {

    public static void main(String[] args) throws Exception {

        //设置连接信息
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitMQConstant.QUEUE_PUBSUB,false,false,false,null);

        //队列绑定交换机
        //RabbitMQConstant.QUEUE_PUBSUB:队列名称;RabbitMQConstant.EXCHANGE_PUBSUB:交换机名称;"":路由key(暂时用不到)
        channel.queueBind(RabbitMQConstant.QUEUE_PUBSUB,RabbitMQConstant.EXCHANGE_PUBSUB,"");

        channel.basicQos(1);//消费完一个消息再去队列拿其他的消息

        //消费消息
        //"":交换机,helloworld模式不使用;queueName:队列名称;null:额外的设置属性;消息内容
        channel.basicConsume(RabbitMQConstant.QUEUE_PUBSUB,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

 

 

 2.routing(路由模式)

路由模式对象:生产者,交换机,路由key,队列,消费者1,消费2。。。,通过队列绑定消费者并且指定唯一路由key的形式指定消费固定路由key的消息。

 2.1 routing实战

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.util.*;

/**
 * 路由模式:新建一个TCP长连接绑定交换机
 */
public class ExchangeTest {

    public static void main(String[] args) throws Exception {


        HashMap map = new HashMap();
        map.put("key1","test1");
        map.put("key2","test2");
        map.put("key3","test3");
        map.put("key4","test4");
        map.put("key5","test5");
        map.put("key6","test6");
        map.put("key7","test7");


        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        Iterator> iterator = map.entrySet().iterator();
        while(iterator.hasNext()){
            Map.Entry entry = iterator.next();
            //绑定交换机
            //RabbitMQConstant.EXCHANGE_PUBSUB:交换机;queueName:队列名称;null:额外的设置属性;消息内容
            channel.basicPublish(RabbitMQConstant.EXCHANGE_ROUTING,entry.getKey(),null,entry.getKey().getBytes());
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;

/**
 * 消费者
 */
public class Consumer1 {

    public static void main(String[] args) throws Exception {

        //设置连接信息
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitMQConstant.QUEUE_ROUTING1,false,false,false,null);

        //队列绑定交换机
        //RabbitMQConstant.QUEUE_PUBSUB:队列名称;RabbitMQConstant.EXCHANGE_PUBSUB:交换机名称;"":路由key(暂时用不到)
        channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_ROUTING,"key1");
        channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_ROUTING,"key2");

        channel.basicQos(1);//消费完一个消息再去队列拿其他的消息

        //消费消息
        //"":交换机,helloworld模式不使用;queueName:队列名称;null:额外的设置属性;消息内容
        channel.basicConsume(RabbitMQConstant.QUEUE_ROUTING1,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到的消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

 

 3. Topics-通配符模式

通配符模式:与上面的类似。

应用场景:当routingkey过多时,会出现很多类似的代码,使用通配符模式可以减少代码量。#代表一个或多个词,*代表一个词。

 3.1 topic-通配符模式实战

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * 通配符模式:新建一个TCP长连接绑定交换机
 */
public class ExchangeTest {

    public static void main(String[] args) throws Exception {


        HashMap map = new HashMap();
        map.put("key.key1.key.key","test1");
        map.put("key.key2.key.key","test2");
        map.put("key.key3.key.key","test3");
        map.put("com.test.key","com.test1");
        map.put("com.test.key1","com.test2");
        map.put("com.test.key2","com.test3");
        map.put("com.test.key3","com.test4");


        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        Iterator> iterator = map.entrySet().iterator();
        while(iterator.hasNext()){
            Map.Entry entry = iterator.next();
            //绑定交换机
            //RabbitMQConstant.EXCHANGE_PUBSUB:交换机;queueName:队列名称;null:额外的设置属性;消息内容
            channel.basicPublish(RabbitMQConstant.EXCHANGE_TOPICS,entry.getKey(),null,entry.getKey().getBytes());
        }

        channel.close();
        connection.close();
    }
}

import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;

/**
 * 消费者
 */
public class Consumer1 {

    public static void main(String[] args) throws Exception {

        //设置连接信息
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitMQConstant.QUEUE_ROUTING1,false,false,false,null);

        //队列绑定交换机
        //RabbitMQConstant.QUEUE_PUBSUB:队列名称;RabbitMQConstant.EXCHANGE_PUBSUB:交换机名称;"":路由key(暂时用不到)
        channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_TOPICS,"key.#");

        channel.basicQos(1);//消费完一个消息再去队列拿其他的消息

        //消费消息
        //"":交换机,helloworld模式不使用;queueName:队列名称;null:额外的设置属性;消息内容
        channel.basicConsume(RabbitMQConstant.QUEUE_ROUTING1,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到的消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

4. rabbitMq消息确认机制

当生产者传送消息到broker以后会出现两种情况:confirm和return。confirm是确定有队列的情况下,消费者确认或者不确认的情况。return是消息到达broker以后找不到队列而发生退回的情况。

4.1 消息确认机制-实战

import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * 消息确认机制案例
 */
public class ConfirmTest {

    public static void main(String[] args) throws Exception {


        HashMap map = new HashMap();
        map.put("key.key1.key.key","test1");
        map.put("key.key2.key.key","test2");
        map.put("key.key3.key.key","test3");
        map.put("com.test.key","com.test1");
        map.put("com.test.key1","com.test2");
        map.put("com.test.key2","com.test3");
        map.put("com.test.key3","com.test4");
        map.put("erro.key","com.erro");
        map.put("erro.key1","com.erro1");
        map.put("erro.key3","com.erro2");


        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        //开启监听机制
        channel.confirmSelect();
        //确认监听机制
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long l, boolean b) throws IOException {//确认返回
                System.out.println("接受到消息,参数:"+l+",b:"+b);
            }

            @Override
            public void handleNack(long l, boolean b) throws IOException {//拒绝消息
                System.out.println("接受到消息拒收消息,参数:"+l+",b:"+b);
            }
        });
        //返回监听机制
        channel.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return r) {
                System.err.println("===========================");
                System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
                System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
                System.err.println("Return主题:" + new String(r.getBody()));
                System.err.println("===========================");
            }
        });

        Iterator> iterator = map.entrySet().iterator();
        while(iterator.hasNext()){
            Map.Entry entry = iterator.next();
            //绑定交换机
            //RabbitMQConstant.EXCHANGE_PUBSUB:交换机;queueName:队列名称;null:额外的设置属性;消息内容
            channel.basicPublish(RabbitMQConstant.EXCHANGE_CONFIRM,entry.getKey(),null,entry.getKey().getBytes());
        }

        //在消息确认机制下我们不能关闭长连接
//        channel.close();
//        connection.close();
    }
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;

/**
 * 消费者
 */
public class Consumer1 {

    public static void main(String[] args) throws Exception {

        //设置连接信息
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitMQConstant.QUEUE_CONFIRM1,false,false,false,null);

        //队列绑定交换机
        //RabbitMQConstant.QUEUE_PUBSUB:队列名称;RabbitMQConstant.EXCHANGE_PUBSUB:交换机名称;"":路由key(暂时用不到)
        channel.queueBind(RabbitMQConstant.QUEUE_CONFIRM1,RabbitMQConstant.EXCHANGE_CONFIRM,"key.#");

        channel.basicQos(1);//消费完一个消息再去队列拿其他的消息

        //消费消息
        //"":交换机,helloworld模式不使用;queueName:队列名称;null:额外的设置属性;消息内容
        channel.basicConsume(RabbitMQConstant.QUEUE_CONFIRM1,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到的消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

5.spring整合rabbitMQ

分两个工程:生产者(spring-rabbit-consumerTest),消费者(spring-rabbit-consumerTest)

生产者:添加pom.xml依赖,配置文件整合,测试案例编写

消费者:添加pom.xml依赖,配置文件整合,增加监听器,启动监听

原文链接:https://blog.csdn.net/qq_21575929/article/details/123780207

栏目分类
最近更新