RabbitMQ(2)—高级使用

1.ack和限流

ack也就是消息确认签收,分为自动签收和手动签收。之前的交换机demo中:
channel.basicConsume(queueName,
true , consumer);  第二个参数就是自动签收,如果我们要手动签收则需要改成false,再去消息处理中手动签收即可

当我们消息队列中已经积压了大量消息的时候。这个时候消费者才启动,,如果是自动签收的话,就会导致大量消息涌入,可能回到服务刚启动就宕机。这个时候就可以限制消息数量,使用手动签收。处理完这一批,再处理下一批。
使用手动签收,我们还可以在拿到消息,进行不同的业务处理,比如如果消息信息有问题,那就不签收,移除当前队列,或者放到其他地方去处理之类的

RabbitMQUtils类的代码在上一节中:RabbitMQ(1)---基本概念及简单demo

ack:手动签收消息:

package com.nijunyang.rabbitmq.ack;

import com.nijunyang.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

/**
 * Description:
 * Created by nijunyang on 2020/6/7 13:07
 */
public class AckProducer {

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        String message = "hello rabbitMQ." + new Random().nextInt(100);

        String exchangeName = "ack.exchange";
        String routingKey = "ack.key";


        Map heads = new HashMap();
        heads.put("userName", "zhangsan");

        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)//消息持久化
                .contentEncoding("UTF-8")
                .correlationId(UUID.randomUUID().toString())
                .headers(heads)//存放头信息
                .build();

        channel.basicPublish(exchangeName, routingKey, basicProperties, message.getBytes("utf-8"));
        channel.basicPublish(exchangeName, routingKey, null, message.getBytes("utf-8"));
        RabbitMQUtils.close(channel, connection);
    }
}
package com.nijunyang.rabbitmq.ack;

import com.nijunyang.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.*;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Description:
 * Created by nijunyang on 2020/6/7 13:07
 */
public class AckConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "ack.exchange";
        String exchangeType = "direct";
        String routingKey = "ack.key";
        String queueName = "ack.queue";

        channel.exchangeDeclare(exchangeName, exchangeType);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName, exchangeName, routingKey);


        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                try {
                    Object userName = properties.getHeaders().get("userName");
                    if (!StringUtils.isEmpty(userName)) {
                        //用发送时候放的 头信息模拟业务问题
                        String message = new String(body, "UTF-8");
                        System.out.println(message);
                        //手动签收消息
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                    else {
                        throw new RuntimeException();
                    }
                }catch (Exception e) {
                    //requeue参数 true 重回队列,,false不重回队列, 或者做其他处理
                    channel.basicNack(envelope.getDeliveryTag(),false,false);
                }
            }
        };

        //autoAck参数 true:开启自动签收,false:关闭自动签收功能
        channel.basicConsume(queueName,false, consumer);


    }
}

限流的话只需要多一个限制:channel.basicQos(0,5,false);  每次只会处理5条消息,签收完了,在处理后面的

package com.nijunyang.rabbitmq.limit;

import com.nijunyang.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

/**
 * Description:
 * Created by nijunyang on 2020/6/7 13:07
 */
public class LimitProducer {

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();


        String exchangeName = "limit.exchange";
        String routingKey = "limit.key";


        for (int i = 0; i < 20; i++) {
            String message = "limit rabbitMQ." + i;
            channel.basicPublish(exchangeName, routingKey, null, message.getBytes("utf-8"));
        }

        RabbitMQUtils.close(channel, connection);
    }
}
package com.nijunyang.rabbitmq.limit;

import com.nijunyang.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.*;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Description:
 * Created by nijunyang on 2020/6/7 13:07
 */
public class LimitConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "limit.exchange";
        String exchangeType = "direct";
        String routingKey = "limit.key";
        String queueName = "limit.queue";

        channel.exchangeDeclare(exchangeName, exchangeType);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName, exchangeName, routingKey);

        /**
         * 限流设置:
         * prefetchSize:每条消息大小的设置
         * prefetchCount:标识每次推送多少条消息
         * global:false标识channel级别的  true:标识消费的级别的
         */
        channel.basicQos(0,5,false);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(message);
                //手动签收消息, 否则就会一直阻塞了
                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        };

        //autoAck参数 true:开启自动签收,false:关闭自动签收功能
        channel.basicConsume(queueName,false, consumer);


    }
}



2.消息投递确认


,开启这个模式之后 消息投递了之后 不能关闭连接,因为监听是绑定在channel上面的
开启消息投递确认模式,,在消息发送者上面绑定一个监听,消息投递成功或者失败回调对应方法。

package com.nijunyang.rabbitmq.confirm;

import com.nijunyang.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Description:
 * Created by nijunyang on 2020/6/7 19:53
 */
public class ConfirmProducer {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //设置消息投递模式(确认模式)
        channel.confirmSelect();
        /**
         * 消息确认监听绑定
         */
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息投递成功");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息投递失败");
            }
        });

        String exchangeName = "confirm.exchange";
        String routingKey = "confirm.key";

        for (int i = 0; i < 20; i++) {
            String message = "limit rabbitMQ." + i;
            channel.basicPublish(exchangeName, routingKey, null, message.getBytes("utf-8"));
        }
        //设置了消息投递确认就不能关闭channel和连接了
//        RabbitMQUtils.close(channel, connection);
    }
}

3.不可达消息处理
:有些消息发送之后,由于设置的原因,不能正常的路由到队列上面。
和消息投递确认差不多,只不过是在生产者的channel上面绑定一个ReturnListener(channel.addReturnListener(new RetrunListener())),然后投递消息的时候使用这个方法channel.basicPublish(exchangeName,routingKey,true,null, message.getBytes()),相比之前的投递方式多了一个布尔类型的mandatory参数。如果true那么就会调用的绑定的ReturnListener,实现的方法,如果是false那么就会直接删除这个消息。



4.死信队列


。专门用来接收没有消费的消息的队列。消息发送到正常队列上面但是没有被消费,就会被转发到死信队列上面。所以说死信队列是和一个正常队列绑定的。消息变成死信的几种情况:1.消息被拒绝(basicNack   basicReject)并且重回队里设置的false,2.消息设置了过期时间,时间到了也没有被消费,3.队列已经达到最大长度,后面进来的消息直接转到死信队列。死信队列也是一个正常的交换机和队列。

package com.nijunyang.rabbitmq.deadqueue;

import com.nijunyang.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

import java.io.IOException;

/**
 * Description:
 * Created by nijunyang on 2020/6/7 20:48
 */
public class DeadQueueProducer {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "normal.exchange";
        String routingKey = "normal.key";

        //设置消息的过期时间10s
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .expiration("10000")
                .build();
        for (int i = 0; i < 20; i++) {
            String message = "dead rabbitMQ." + i;
            channel.basicPublish(exchangeName, routingKey, basicProperties, message.getBytes("utf-8"));
        }
        RabbitMQUtils.close(channel, connection);
    }
}
package com.nijunyang.rabbitmq.deadqueue;

import com.nijunyang.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Description:
 * Created by nijunyang on 2020/6/7 20:51
 */
public class DeadQueueConsumer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明正常的队列
        String normalExchangeName = "normal.exchange";
        String exchangeType = "direct";
        String normalQueueName = "normal.queue";
        String routingKey = "normal.key";

        channel.exchangeDeclare(normalExchangeName, exchangeType);

        //申明死信队列
        String deadExchangeName = "dead.exchange";
        String deadExchangeType = "topic";
        String deadQueueName = "dead.queue";

        Map queueArgs = new HashMap();
        //正常队列上绑定死信队列信息
        queueArgs.put("x-dead-letter-exchange", deadExchangeName);
        queueArgs.put("x-max-length", 4); //队列的最大长度
        channel.queueDeclare(normalQueueName,true,false,false, queueArgs);
        channel.queueBind(normalQueueName, normalExchangeName, routingKey);

        //声明死信队列
        channel.exchangeDeclare(deadExchangeName, deadExchangeType);
        channel.queueDeclare(deadQueueName,true,false,false,null);
        channel.queueBind(deadQueueName, deadExchangeName,"#");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(message);
                channel.basicNack(envelope.getDeliveryTag(),false,false); //拒签
            }
        };
        channel.basicConsume(normalQueueName, false, consumer);

    }
}


可有看到所有的消息最后都转到了 死信队列中去了。这个模式还可以用于延迟队列。只需要设置正常队列消息的过期时间,然后转到死信队列,,消费者监听消费死信队列,就可以实现延时队列了。



5.单播消费模式


,首先我们要明确一点消费者最终都是从队列中拿到消息消费的,我们将多个消费者都绑定到同一个队列上面去,这个时候,队列消息只会被一个消费者消费,不会重复让每个消费者都消费。



6.多播消费模式


,和单播消费差不多,这个时候我们需要申明多个队列绑定同一个交换机,这样交换机的信息就会发到多个队列上面,这样通过同一个交换机将同一条消息发送到不同的队列上面去了,也就实现了让不同的消费者消费了同一条消息了。

package com.nijunyang.rabbitmq.deadqueue;

import com.nijunyang.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Description:
 * Created by nijunyang on 2020/6/7 20:51
 */
public class DeadQueueConsumer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();

//声明正常的队列
String normalExchangeName = "normal.exchange";
String exchangeType = "direct";
String normalQueueName = "normal.queue";
String routingKey = "normal.key";

channel.exchangeDeclare(normalExchangeName, exchangeType);

//申明死信队列
String deadExchangeName = "dead.exchange";
String deadExchangeType = "topic";
String deadQueueName = "dead.queue";

Map queueArgs = new HashMap();
//正常队列上绑定死信队列信息
queueArgs.put("x-dead-letter-exchange", deadExchangeName);
queueArgs.put("x-max-length", 4); //队列的最大长度
channel.queueDeclare(normalQueueName,true,false,false, queueArgs);
channel.queueBind(normalQueueName, normalExchangeName, routingKey);

//声明死信队列
channel.exchangeDeclare(deadExchangeName, deadExchangeType);
channel.queueDeclare(deadQueueName,true,false,false,null);
channel.queueBind(deadQueueName, deadExchangeName,"#");

Consumer consumer = new DefaultConsumer(channel) {
            @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
System.out.println(message);
channel.basicNack(envelope.getDeliveryTag(),false,false); //拒签
}
        };
channel.basicConsume(normalQueueName, false, consumer);

}
}