RabbitMQ基础

码农天地 -
RabbitMQ基础
1、RabbitMQ

消息队列解决了什么问题

异步处理应用解耦流量削锋日志处理运行rabbitmq镜像
# docker run --name rabbitmq -tid -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq
修改rabbitmq设置
# docker exec -it 容器号 /bin/bash
新增用户
# rabbitmqctl add_user [user_name] [pwd]
查看用户
# rabbitmqctl list_users
Setting permissions for user "[user_name]" in vhost "/" ...
# rabbitmqctl set_permissions -p "/" [user_name] ".*" ".*" ".*"

# rabbitmqctl list_permissions -p /
将[user_name]用户设置为administrator角色
# rabbitmqctl set_user_tags asdf administrator
删除guest用户
# rabbitmqctl delete_user guest
开启web界面
# rabbitmq-plugins enable rabbitmq_management
web访问
# http://IP:15672

Java操作RabbitMQ

simple 简单队列work queues 工作队列 公平分发 轮询分发publish/subscribe 发布订阅routing 路由选择 通配符模式Topics 主题手动和自动确认消息队列的持久化和非持久化rabbitMQ的延迟队列依赖
<dependencies>
    <!-- 引入队列依赖 -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.0.2</version>
    </dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.10</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.5</version>
</dependency>

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
</dependency>
    </dependencies>
2、简单队列

2.2、定义连接MQ的工具
public class connectionUtil {

    public static Connection getConnection() throws IOException, TimeoutException {
        //定义链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址[运行rabbitMQ的地址]
        factory.setHost("192.168.168.130");
        //AMQ的端口号
        factory.setPort(5672);
        //vHost
        factory.setVirtualHost("lgz");
        factory.setUsername("lgz");
        factory.setPassword("pwd123456");
        return   factory.newConnection();
    }
}
2.3、生产者发送消息
//测试发送信息
public class ProducerSend {
    private static final String QUEUE_NAME="test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("cs");
        //获取链接
        Connection connection = connectionUtil.getConnection();
        //从链接中获取通道
        Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        String msg="hello simple";
         channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("test success");
         channel.close();
         connection.close();
    }

}
2.4、消费者获取信息
public class comsumerGain {

    private static final String QUE_NAME="test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = connectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //定义消费者
        DefaultConsumer  consumer = new DefaultConsumer(channel) {
            //获取到达的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println(msg);
            }
        };
        //监听队列
        channel.basicConsume(QUE_NAME,true,consumer);

    }
}
2.5、简单队列的缺点耦合性高:生产者一 一对应消费者3、工作队列3.1、工作队列模型

3.2、生产者演示
public class WorkQueue {
    public static  final  String QUEUE_NAME="work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建连接
        Connection connection = connectionUtil.getConnection();
        //获取channel链接
        Channel channel = connection.createChannel();
        //
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        for (int i = 0; i <50 ; i++) {
            String msg="No"+i;
            System.out.println(msg);
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            Thread.sleep(i*20);
        }
        channel.close();
        connection.close();


    }
}
3.3、消费者1
public class WorkRecv {
    private final static String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = connectionUtil.getConnection();
        //获取channel
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //定义一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            //消息触发

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");

                System.out.println("receive"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("结束");
                }
            }
        };

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

    }
3.4、消费者2
public class WorkRecv {
    private final static String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = connectionUtil.getConnection();
        //获取channel
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //定义一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            //消息触发

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");

                System.out.println("receive"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("结束");
                }
            }
        };

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

    }
}
3.5、总结采用了轮询算法轮询机制4、公平分发4.1、生产者
public class WorkQueue {
    public static  final  String QUEUE_NAME="work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建连接
        Connection connection = connectionUtil.getConnection();
        //获取channel链接
        Channel channel = connection.createChannel();
        //
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /*
        * 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者。【一次只处理一个消息】
        * */
        int prefetchCount=1;
        channel.basicQos(prefetchCount);


        for (int i = 0; i <50 ; i++) {
            String msg="No"+i;
            System.out.println(msg);
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            Thread.sleep(i*20);
        }
        channel.close();
        connection.close();


    }
}
4.2、消费者1
public class WorkRecv {
    private final static String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = connectionUtil.getConnection();
        //获取channel
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicQos(1); //一次只分发一个

        //定义一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            //消息触发





            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");

                System.out.println("receive"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("结束");

                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}
4.2、消费者2
public class WorkReceive {
    private final static String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = connectionUtil.getConnection();
        //获取channel
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //保证每次只分发一个
        channel.basicQos(1);

        //定义一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            //消息触发

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");

                System.out.println("receive"+msg);

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("【start】:");

                    //手动回执一个消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}
4.3、疑难问题basicQos(1);用于限制rabbitMQ一次只分发一个消息使用公平分发必须关闭自动应答ack,改成手动的5、消息应答与消息持久化5.1、消息应答boolean autoack=false (手动确认模式),如果有消费者宕机,则会将信息交付给其他的消费者。【rabbitmq支持消息应答,消费者发送一个消息确认,则rabbitmq会进行删除内存数据】Boolean autoAck=true (自动确认模式),一旦rabbitmq将消息分发给消费者就会从内存中删除

如果这种情况下,杀死正在执行的消费者,就会造成正在处理的信息丢失。

默认情况下是autoAck是false如果rabbitMQ宕机,则服务器数据仍然丢失5.2、消息持久化
//声明队列
channel.queueDeclare(QUEUE_NAME,[durable]false,false,false,null);
//durable:持久化,

将程序中的durable的false改称为true,也是不可以的。因为定义的QUEUE_NAME代表这个queue是未持久化的,rabbitmq不准重新定义一个已存在的队列

在控制台【访问mq的界面】进行删除这个队列6、订阅模式【fanout】

一个生产者,多个消费者每个消费者都有自己的队列生产者将消息发送到交换机【转发器】,而不是直接发送到队列中每个队列都绑定在交换机生产者发送的消息,经过交换机到达队列【实现一个消息被多个消费者消费】交换机中无法数据【只有队列具有存储能力】,如果想要进行存储6.1、生产者
public class Send {
    public static final String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分发

        String msg="hello_exchange";
        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
        System.out.println("send:"+msg);

        channel.close();
        connection.close();
    }
}
6.2、消费者
public class Receive1 {
    public static final String QUEUE_NAME="test_queue_exchange";
    public static final String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        channel.basicQos(1);//保证每次只能分发一个

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");
                System.out.println("Receive1"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[ok]");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
    }
}
6.3、消费者2
public class Receive2{
    public static final String QUEUE_NAME="email_queue_exchange";
    public static final String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        channel.basicQos(1);//保证每次只能分发一个

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");
                System.out.println("Receive2"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[ok2]");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
    }
}
7、Routing【direct】

一方面接收生产者的消息,另一方面向队列发送消息

Fanout(不处理路由键)

Direct(处理路由键)

7.1、路由模式

7.2、生产者
public class Send {
    public static final String EXCHANGE_NAME="test_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");

        String msg=new String("Hello direct");

        String routingKey="warning";
        channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
        System.out.println("send"+msg);

        channel.close();
        connection.close();
    }
}
7.3、消费者1
public class Receive1 {
    public static final String EXCHANGE_NAME="test_exchange_direct";
    public static final String QUEUE_NAME="queue_direct_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        final Channel channel = connection.createChannel();

        //
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");

        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");


                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[done]"+msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;//自动应答false
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}
7.3、消费者2
public class Receive2 {
    public static final String EXCHANGE_NAME="test_exchange_direct";
    public static final String QUEUE_NAME="queue_direct_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        final Channel channel = connection.createChannel();

        //
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");

        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");


                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[done]"+msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;//自动应答false
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}
8、Topics Exchange

类似于sql中的模糊查询,

将路由键routing key和某个模式盘匹配

匹配一个或者多个*匹配一个

8.1、生产者
public class Send {

    private static final String EXCHANGE_NAME="test_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        String msg="商品";
        channel.basicPublish(EXCHANGE_NAME,"goods.delete",null,msg.getBytes());
        System.out.println("---send"+msg);


        channel.close();
        connection.close();
    }

}
8.2、消费者1
public class Receive1 {
    public static final String QUEUE_NAME="test_queue_topic_1";
    private static final String EXCHANGE_NAME="test_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");



        channel.basicQos(1);//保证每次只能分发一个

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");
                System.out.println("Receive1"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[ok]");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
    }
}
8.2、消费者2
public class Receive2 {
    public static final String QUEUE_NAME="test_queue_topic_1=2";
    private static final String EXCHANGE_NAME="test_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");
        channel.basicQos(1);//保证每次只能分发一个

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");
                System.out.println("Receive1"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[ok]");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
    }
}
9、消息确认机制

在MQ中可以通过持久化数据解决rabbitmq服务器异常的数据丢失问题

生产者将消息发送出去以后,如何知道消息到底有没有到达rabbitmq服务器?

方式一:AMQP实现事务机制方式二:Confirm模式9.1、事务机制

txSelect :用户将当前channel设置成transaction模式

txCommit:用于提交事务

txRollback:回滚事务

生产者
public class TxSend {
    private static final String QUEUE_NAME="tx_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();


        Channel channel = connection.createChannel();

        String msg="hello autoCommit";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);



        try {
            channel.txSelect();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            channel.txCommit();
            int i=1/0;
            System.out.println(msg);
        } catch (IOException e) {
            channel.txRollback();
            System.out.println("error");
            e.printStackTrace();
        }

        channel.close();
        connection.close();
    }
}
消费者
public class TxReceive1 {
    private static final String QUEUE_NAME="tx_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                System.out.println("receive"+new String(body,"utf-8"));
            }
        });

    }
}
9.2、生产者Confirm模式9.2.1、发送单条
public class Send1 {
    private static final String QUEUE_NAME="tx_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //设置为confirm模式
        channel.confirmSelect();

        String msg="confirm_text";

        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

        if (!channel.waitForConfirms()){
            System.out.println("send message failed");
        }else {
            System.out.println("success");
        }
        channel.close();
        connection.close();
    }
}
9.2.2、批量发送
public class SendMore {
    private static final String QUEUE_NAME="confirm_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //设置为confirm模式
        channel.confirmSelect();

        String msg="confirm_text";

        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        //主要进行遍历发送,串行的【发送完之后在进行确认】
        for (int i = 0; i <10 ; i++) {
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        if (!channel.waitForConfirms()){
            System.out.println("send message failed");
        }else {
            System.out.println("success");
        }
        channel.close();
        connection.close();
    }
}
9.3、异步模式10、参数详解10.1、queueDeclare
//绑定channel与消息队列
//参数一:队列名称【如果不存在该队列,则自动创建】
//参数二:durable【是否要进行持久化】
//参数三:exclusive【是否独占队列】
//参数四:是否在消费完成之后是否要立即删除队列【true:自动删除】【false:不自动删除】
//参数五:额外参数
channel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args);

channel.queueDeclare("helloWorld",false,false,false,null);
10.2、basicPublish
//发布消息
/*
* 参数1:【exchange】交换机名称
* 参数2:【routingKey】队列名称
* 参数3:【props】传递消息的额外设置
* 参数4:传递消息的具体内容【byte类型】
*
* */
channel.basicPublish("","helloWorld",null,"hello rabbitMQ".getBytes());
10.3、basicConsume
//消费信息
/*
 * 参数一:队列名称
 * 参数二:开启消息的自动确认机制
 * 参数三:消费时的回调接口
 * */
channel.basicConsume("helloWorld",true,consumer);
11、整合Boot11.1、依赖导入
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
     <version>2.2.6.RELEASE</version>
</dependency>
11.2、yml
spring:
  application:
    name: rabbit-springboot
  rabbitmq:
    host: 192.168.168.130
    port: 5672
    virtual-host: /
    username: lgz
    password: pwd123456
11.3boottest
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void contextLoads() {

    rabbitTemplate.convertAndSend("hello","helloWorld");
    System.out.println(1);
}
11.4、
@SpringBootTest
class BootRabbitmqApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {

        rabbitTemplate.convertAndSend("hello","helloWorld");
        System.out.println(1);
    }



    @Test
    public void testWork(){
        for (int i = 0; i <5 ; i++) {
            rabbitTemplate.convertAndSend("work","work模型");

        }
    }

    @Test
    public void testFanout(){
        rabbitTemplate.convertAndSend("logs","","Fanout model");
    }


    @Test
    public void testRouting(){
        rabbitTemplate.convertAndSend("directs","info","info_key_routing_information");
    }


    @Test
    public  void  testTopic(){
        rabbitTemplate.convertAndSend("topics","user.save","user.save exchange");
    }
}
11.5、测试topic[消费者]
@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue,
                exchange = @Exchange(type = "topic",name = "topics"),
                key = {"product.save","product.*"}
        )
})
public  void  receive(String msg){

    System.out.println("consumer1"+msg);
}

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue,
                exchange = @Exchange(type = "topic",name = "topics"),
                key = {"user.save","user.*"}
        )
})
public  void  receive2(String msg){

    System.out.println("consumer2"+msg);
}
11.5、测试routing
@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue,
                exchange = @Exchange(value = "directs",type = "direct"),
                key = {"info","error"}
        )
})
public void receive1(String msg){
    System.out.println(msg);
}
11.6、普通的work
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive(String message){
    System.out.println("message:["+message+"]");
}
11.7、Fanout模式
@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue, //创建临时队列
                exchange =@Exchange(value = "logs", type = "fanout")    //绑定的交换机
        )
})
public void receive1(String msg){

    System.out.println("["+msg+"]");
}
11.8、Simple
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class Hello {
    @RabbitHandler
    public void receive1(String message){
        System.out.println("message"+message);
    }
}
特别申明:本文内容来源网络,版权归原作者所有,如有侵权请立即与我们联系(cy198701067573@163.com),我们将及时处理。
下一篇: linux网站服务

Tags 标签

加个好友,技术交流

1628738909466805.jpg