消息中间件简介

      消息中间件是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
      消息队列中间件,也可以称为消息队列或者消息中间件。它一般有两种传递模式:点对点(P2P,Ponit-to-Point)模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。

消息中间件作用

  • 解耦:在项目启动之初来预测将来会碰到什么需求是极其困难的。消息中间件在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可。
  • 冗余(存储):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在把一个消息从消息中间件中删除之前,需要你的处理系统明确地指出该消息已经被处理完成,从而确保你的数据被安全地保存直到你使用完毕。
  • 扩展性:因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
  • 削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
  • 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
    顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
  • 缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。
  • 异步通信:在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。

Docker安装RabbitMQ

# 拉取镜像
ducoer pull rabbitmq:management

# 启动RabbitMQ
docker run -dit --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

注意:需要开放15672和5672端口(或者直接关闭防火墙)。如果是使用云服务器的话需要注意安全组。
访问http://ip:15672出现以下界面。使用admin/admin登录
请输入图片描述

相关概念介绍

      RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ模型更像是一种交换机模型。
请输入图片描述

生产者和消费者

  • Producer:生产者,投递消息的一方。消息一般包含2个部分:消息体(Payload)和标签(Label)。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产把消息交由RabbitMQ,MQ根据标签吧消息发送给感兴趣的消费者。
  • Consumer:消费者,接收消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(Payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。
  • Broker:消息中间件的服务节点。对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个RabbitMQ Broker 看作一台RabbitMQ服务器。
    请输入图片描述

队列

      Queue:队列,是RabbitMQ的内部对象,用于存储消息。
      RabbitMQ中消息都只能存储在队列中,这一点和Kafka这种消息中间件相反。Kafka将消息存储在topic(主题)这个逻辑层面,而相对应的队列逻辑只是topic实际存储文件中的位移标识。RabbitMQ的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,
请输入图片描述

交换器、路由键、绑定

交换器

      Exchange:交换器。在上一节队列的图中我们暂时可以认为生产者将消息投递到队列中,实际上在RabbitMQ中这种情况不会发生。
      真实情况:生产者将消息发送到Exchange(交换器,通常用X来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
请输入图片描述

      RabbitMQ中的交换器有四种类型,不同的类型有不同的路由策略。下篇文章介绍。

路由键

      RoutingKey:路由键。生产者将消息发送给交换器的时候一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
      在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。

绑定

      绑定。RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。
请输入图片描述

      生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。BindingKey并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。

HelloWrold代码

Producer

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置相关属性
        connectionFactory.setHost("IP");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setConnectionTimeout(6000000);
        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        //3、通过connection创建一个Channel
        Channel channel = connection.createChannel();
        //4、通过Channel发送数据
        String msg = "Hello";
        for (int i = 1; i <= 6; i++) {
            /**
             * 方法解析:basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * exchange:发送消息的交换器名称,此处为""使用默认交换器
             * routingKey:路由键
             * props:消息的其他属性,如果routing header等
             * body:消息内容
             */
            channel.basicPublish("","test001",null,(msg+i).getBytes());
        }
        //5、关闭相关连接
        channel.close();
        connection.close();

    }
}

Consumer

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置属性
        connectionFactory.setHost("IP");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setConnectionTimeout(6000000);
        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        //3、通过connection创建一个Channel
        Channel channel = connection.createChannel();
        //4、声明队列
        String queueName = "test001";
        /**
         * 方法参数解析:queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
         * queue:队列名称
         * durable:队列是否持久化(服务器重启后依然存在)
         * exclusive:队列是否排他。如果一个队列声明为排他队列,该队列公对首次声明它的连接可见,并在连接断开时自动删除。
         * autoDelete:队列是否自动删除(队列不再使用时服务器会将队列自动删除)。自动删除的前提:至少有一个消息者连接到这个队列,之后所有与这个队列连接的消息都断开时,才会自动删除
         * arguments:队列的其他属性
         */
        channel.queueDeclare(queueName,false,false,false,null);
        //5、创建消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //6、设置Channel
        channel.basicConsume(queueName,true,consumer);

        while (true){
            //7、获取消息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();//等待下一个消息传递和返回。
            String msg = new String(delivery.getBody());
            System.out.println("消息是:"+msg);
        }
    }
}
Last modification:February 4th, 2020 at 01:13 am
如果觉得我的文章对你有用,请随意赞赏