nodejs操作消息队列RabbitMQ

2021-04-13 09:53:12

参考地址 nodejs操作消息队列RabbitMQ

安装RabbitMQ https://blog.csdn.net/qq_35014708/article/details/93232033


一. 什么是消息队列

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。

其主要用途:不同进程Process/线程Thread之间通信。


为什么会产生消息队列?有几个原因:


不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;


不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;


二. 常用的消息队列

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq


三. 使用场景

异步处理


应用解耦


流量削峰


四 使用amqplib操作RabbitMQ

安装 amqplib


npm install amqplib

生产者:


let amqp = require('amqplib');

 

class RabbitMQ {

    constructor() {

        this.hosts = [];

        this.index = 0;

        this.length = this.hosts.length;

        this.open = amqp.connect(this.hosts[this.index]);

    }

    sendQueueMsg(queueName, msg, errCallBack) {

        let self = this;

 

        self.open

            .then(function (conn) {

                return conn.createChannel();

            })

            .then(function (channel) {

                return channel.assertQueue(queueName).then(function (ok) {

                    return channel.sendToQueue(queueName, new Buffer(msg), {

                        persistent: true

                    });

                })

                    .then(function (data) {

                        if (data) {

                            errCallBack && errCallBack("success");

                            channel.close();

                        }

                    })

                    .catch(function () {

                        setTimeout(() => {

                            if (channel) {

                                channel.close();

                            }

                        }, 500)

                    });

            })

            .catch(function () {

                let num = self.index++;

 

                if (num <= self.length - 1) {

                    self.open = amqp.connect(self.hosts[num]);

                } else {

                    self.index == 0;

                }

            });

    }

}

 

let mq = new RabbitMQ();

mq.sendQueueMsg('testQueue', '123', (error) => {

    console.log(error)

})

消费者


let amqp = require('amqplib');

 

class RabbitMQ {

    constructor() {

        this.hosts = [];

        this.index = 0;

        this.length = this.hosts.length;

        this.open = amqp.connect(this.hosts[this.index]);

    }

 

    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {

        let self = this;

 

        self.open

            .then(function (conn) {

                return conn.createChannel();

            })

            .then(function (channel) {

                return channel.assertQueue(queueName)

                    .then(function (ok) {

                        return channel.consume(queueName, function (msg) {

                            if (msg !== null) {

                                let data = msg.content.toString();

                                channel.ack(msg);

                                receiveCallBack && receiveCallBack(data);

                            }

                        })

                            .finally(function () {

                                setTimeout(() => {

                                    if (channel) {

                                        channel.close();

                                    }

                                }, 500)

                            });

                    })

            })

            .catch(function () {

                let num = self.index++;

                if (num <= self.length - 1) {

                    self.open = amqp.connect(self.hosts[num]);

                } else {

                    self.index = 0;

                    self.open = amqp.connect(self.hosts[0]);

                }

            });

    }

}

 

let mq = new RabbitMQ();

mq.receiveQueueMsg('testQueue',(msg) =>

{

    console.log(msg)//123

})

打开mq后台 http://127.0.0.1:15672/ 看到新增队列,接受一条消息




 


当运行消费者代码时输入 123,消息队列消息为0







  • 2019-03-15 15:28:33

    10分钟教你搭建自己的ngrok服务器

    内网穿透想必开发过微信的同志都很了解,大部分人选择网上寻找各种现成的,比如ngrok官网、ittun-ngrok、sunny-ngrok或者花生壳之类的。但是世界上没有免费的午餐,要不就是收费,要不就是免费但是偶尔会出现连接失败的问题(当然大多数时间是没有问题的)。

  • 2019-03-15 15:29:53

    丢弃花生壳,搭建自己的ngrok作为内网穿透服务器

    公司没有公网,公司在二级路由下面(就是服务商没有给独立IP,也无法动态获取公网IP),然而公司在开发程序的时候,如对接微信等需要返回数据,或者需要别人能访问我们网址,一般我们就只能购买花生壳内网穿透,但是在使用中发现速度慢,经常掉。所以搭建自己的ngrok服务器来保证内网穿透。

  • 2019-03-15 15:31:21

    内网穿透 ngrok 服务器和客户端配置

    ngrok 简介及作用 ngrok 是一款用 go 语言开发的开源软件,它是一个反向代理,通过在公共的端点和本地运行的 Web 服务器之间建立一个安全的通道。下图简述了 ngrok 的原理。

  • 2019-03-15 15:32:09

    内网打洞以及代码实现

    假设内网的多个ip或者同一ip的不同port,都要访问同一个(外网ip:port)。对NAT来说,从外网接收的包它的(srcIp:srcPort)==(serverIp,serverPort),它的dstIp==natIp,所以NAT只能用dstPort来决定把这个包转发给哪一个(内网Ip:port)

  • 2019-03-15 15:33:08

    Xshell不能按退格、删除键的解决方案

    在使用xshell时,由于每个服务器不同,一些无法使用Backspace键向后删除字符。针对这个问题,本文为大家解答下退格键无法识别如何设置?

  • 2019-03-15 15:49:28

    win10远程桌面连接不上解决方法

    有朋友就感叹电脑的世界真的是很神奇,可以将整个世界连接在一起。如果别人想要摆弄你的电脑,即使不在一个地方也可以利用远程桌面来控制。而这就是所谓的远程控制操作了,大部分人都知道它的作用,不过这也不排除会遇到一些突发情况的时候,例如win10远程桌面连接不上,这该怎么去解决呢?为此,小编给大家带来了解决的图文操作。

  • 2019-03-15 16:49:18

    Win7无法进入家庭组提示“您的系统管理员不允许访问家庭组”怎么办

     家庭组是家庭网络上可以共享文件和打印机的一组计算机,可以方便用户们共享文件或者视频等,可是最近有win7纯净版系统用户却发现无法进入家庭组,提示“您的系统管理员不允许访问家庭组”,该怎么办呢?现在给大家分享一下Win7无法进入家庭组提示“您的系统管理员不允许访问家庭组”的解决方法。

  • 2019-03-17 22:19:28

    动态更新Toolbar Menu以及Menu中同时显示文字和图标

    我们经常会有这样的需求,在切换Fragment或者点击某个按钮后动态更新Toolbar上Menu项.但是onCreateOptionsMenu方法只在创建Activity的时候调用一次,以后就不再调用了,所以就不能在onCreateOptionsMenu中做处理了。 不过系统提供了另外的一个方法onPrepa