rabbit mq

1.入门

1.简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。 工作原理:现在索引中找到对应的值,然后根据匹配的索引记录找到对应的数据行。 select *from student where id = 5 ;先在索引上按id=5查找,然后返回包含该值的数据行。

2.应用场景

异步处理

场景:用户注册后,需要发注册邮件和注册短信。 传统方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。

引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理。 应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。缺点:当库存系统出现故障时,订单就会失败;订单系统和库存系统高耦合。

消息队列:订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。库存系统:订阅下单的消息,获取下单消息,进行库操作。

流量削峰 场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 消息队列:1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.2.秒杀业务根据消息队列中的请求信息,再做后续处理。

3.架构

Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息的载体,每个消息都会被投到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。

Producer:消息生产者,就是投递消息的程序。

Consumer:消息消费者,就是接受消息的程序。

Channel:消息通道,在客户端的每个连接里,可建立多个channel。

2.特性

1.任务分发机制

Round-robin(轮询分发)

在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。 RabbbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在load加重,那么只需要创建更多的Consumer来进行任务处理。 Fair dispatch(公平分发)

轮训不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。可能存在有些服务很忙依然分发给它,有些很轻松却没有任务。为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

2.消息应答

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ可以删除它了。

如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会不丢失任何消息了。

没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。

消息应答是默认打开的。我们明确地把它们关掉了(autoAck=true)。现在将应答打开,一旦我们完成任务,消费者会自动发送消息应答。

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

3.消息持久化

当消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍将失去!当RabbitMQ退出或者崩溃,将会丢失队列和消息。除非你不要队列和消息。两件事儿必须保证消息不被丢失:我们必须把“队列”和“消息”设为持久化。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

4.发布/订阅

生产者只能发送消息给Exchanges(转发器),转发器是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则通过转发器的类型进行定义。

类型有:Direct、Topic、Headers和Fanout。

channel.exchangeDeclare("logs", "fanout");

转发器转发消息到队列。关联转发器和队列的叫Binding。

channel.queueBind(queueName, "logs", "");

Direct exchange(直接转发) 准确匹配 Topic exchange(主题转发器) 模糊匹配

(星号)可以代替任意一个标识符 ;#(井号)可以代替零个或多个标识符。

.orange.
”,Q2绑定键是“ .*.rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而”lazy.brown.fox”则只发往第二个队列。 “quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

3.代码

producer:

String EXCHANGE_NAME = "direct_logs";
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        // 设置MabbitMQ所在主机ip或者主机名
        factory.setHost("127.0.0.1");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 指定转发——广播
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
 
        //所有日志严重性级别
        String[] severities={"error","info","warning"};
        for(int i=0;i<3;i++){
            String severity = severities[i%3];//每一次发送一条不同严重性的日志
            
            // 发送的消息
            String message = "Hello World"+Strings.repeat(".", i+1);
            //参数1:exchange name
            //参数2:routing key
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
            system.out.println(message);
        }
        // 关闭频道和连接
        channel.close();

consumer:

ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        // 打开连接和创建频道,与发送端一样
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
 
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 声明一个随机队列
        String queueName = channel.queueDeclare().getQueue();
        
        String severity="error";//只关注error级别的日志,然后记录到文件中去。
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
        
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        
        // 创建队列消费者
        final Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                system.out.println(message);
              }
            };
            channel.basicConsume(queueName, true, consumer);