nodejs操作消息队列RabbitMQ

2021-04-13 09:53:12

参考地址 nodejs操作消息队列RabbitMQ

安装RabbitMQ https://blog.csdn.net/qq_35014708/article/details/93232033


一. 什么是消息队列

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。

其主要用途:不同进程Process/线程Thread之间通信。


为什么会产生消息队列?有几个原因:


不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;


不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;


二. 常用的消息队列

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq


三. 使用场景

异步处理


应用解耦


流量削峰


四 使用amqplib操作RabbitMQ

安装 amqplib


npm install amqplib

生产者:


let amqp = require('amqplib');

 

class RabbitMQ {

    constructor() {

        this.hosts = [];

        this.index = 0;

        this.length = this.hosts.length;

        this.open = amqp.connect(this.hosts[this.index]);

    }

    sendQueueMsg(queueName, msg, errCallBack) {

        let self = this;

 

        self.open

            .then(function (conn) {

                return conn.createChannel();

            })

            .then(function (channel) {

                return channel.assertQueue(queueName).then(function (ok) {

                    return channel.sendToQueue(queueName, new Buffer(msg), {

                        persistent: true

                    });

                })

                    .then(function (data) {

                        if (data) {

                            errCallBack && errCallBack("success");

                            channel.close();

                        }

                    })

                    .catch(function () {

                        setTimeout(() => {

                            if (channel) {

                                channel.close();

                            }

                        }, 500)

                    });

            })

            .catch(function () {

                let num = self.index++;

 

                if (num <= self.length - 1) {

                    self.open = amqp.connect(self.hosts[num]);

                } else {

                    self.index == 0;

                }

            });

    }

}

 

let mq = new RabbitMQ();

mq.sendQueueMsg('testQueue', '123', (error) => {

    console.log(error)

})

消费者


let amqp = require('amqplib');

 

class RabbitMQ {

    constructor() {

        this.hosts = [];

        this.index = 0;

        this.length = this.hosts.length;

        this.open = amqp.connect(this.hosts[this.index]);

    }

 

    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {

        let self = this;

 

        self.open

            .then(function (conn) {

                return conn.createChannel();

            })

            .then(function (channel) {

                return channel.assertQueue(queueName)

                    .then(function (ok) {

                        return channel.consume(queueName, function (msg) {

                            if (msg !== null) {

                                let data = msg.content.toString();

                                channel.ack(msg);

                                receiveCallBack && receiveCallBack(data);

                            }

                        })

                            .finally(function () {

                                setTimeout(() => {

                                    if (channel) {

                                        channel.close();

                                    }

                                }, 500)

                            });

                    })

            })

            .catch(function () {

                let num = self.index++;

                if (num <= self.length - 1) {

                    self.open = amqp.connect(self.hosts[num]);

                } else {

                    self.index = 0;

                    self.open = amqp.connect(self.hosts[0]);

                }

            });

    }

}

 

let mq = new RabbitMQ();

mq.receiveQueueMsg('testQueue',(msg) =>

{

    console.log(msg)//123

})

打开mq后台 http://127.0.0.1:15672/ 看到新增队列,接受一条消息




 


当运行消费者代码时输入 123,消息队列消息为0







  • 2018-12-02 10:54:14

    HTTP长连接、短连接究竟是什么?

    HTTP的长连接和短连接本质上是TCP长连接和短连接。HTTP属于应用层协议,在传输层使用TCP协议,在网络层使用IP协议。 IP协议主要解决网络路由和寻址问题,TCP协议主要解决如何在IP层之上可靠地传递数据包,使得网络上接收端收到发送端所发出的所有包,并且顺序与发送顺序一致。TCP协议是可靠的、面向连接的。

  • 2018-12-04 15:30:01

    如何在Mac OS X上安装 Ruby运行环境

    ​ 对于新入门的开发者,如何安装 Ruby和Ruby Gems 的运行环境可能会是个问题,本页主要介绍如何用一条靠谱的路子快速安装 Ruby 开发环境。 此安装方法同样适用于产品环境!

  • 2018-12-04 15:31:15

    iOS--Pod install && Pod update

    许多人在最初接触CocoaPods时认为pod install只是在第一次为项目设置CocoaPods时使用,之后都应该使用pod update.看起来是这样,但也不是(But that's not the case at all.)。 这篇文章的目的就是教你啥时候用pod install,啥时候用pod update

  • 2018-12-04 15:33:19

    CocoaPods安装和使用教程

    当你开发iOS应用时,会经常使用到很多第三方开源类库,比如JSONKit,AFNetWorking等等。可能某个类库又用到其他类库,所以要使用它,必须得另外下载其他类库,而其他类库又用到其他类库,“子子孙孙无穷尽也”,这也许是比较特殊的情况。总之小编的意思就是,手动一个个去下载所需类库十分麻烦。另外一种常见情况是,你项目中用到的类库有更新,你必须得重新下载新版本,重新加入到项目中,十分麻烦。如果能有什么工具能解决这些恼人的问题,那将“善莫大焉”。所以,你需要 CocoaPods。

  • 2018-12-04 23:37:37

    pod install 和 pod update

    当我们新建一个Podfile文件运行后,会自动生成一个Podfile.lock文件,Podfile.lock文件里存储着我们已经安装的依赖库(pods)的版本。 当我们第一次运行Podfile时,如果对依赖库不指定版本的话,cocoapods会安装最新的版本,同时将pods的版本记录在Podfile.lock文件中。这个文件会保持对每个pod已安装版本的跟踪,并且锁定这些版本。

  • 2018-12-04 23:40:26

    pod删除已导入的第三方库和移除项目中的cocoapods

    CocoaPods是一个负责管理iOS项目中第三方开源库的工具。CocoaPods的项目源码在Github上管理。在我们有了CocoaPods这个工具之后,只需要将用到的第三方开源库放到一个名为Podfile的文件中,然后在命令行执行$ pod install命令。CocoaPods就会自动将这些第三方开源库的源码下载下来,并且为我的工程设置好相应的系统依赖和编译参数. 但是如果我们导入的某个第三方不适用,或者我们又不想使用该第三方,那我们又该如何将这些相关的东西从我们的项目中清理出去呢?