nodejs操作消息队列RabbitMQ

2021-04-13 09:53:12

参考地址 nodejs操作消息队列RabbitMQ

安装RabbitMQ https://blog.csdn.net/qq_35014708/article/details/93232033


一. 什么是消息队列

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。

其主要用途:不同进程Process/线程Thread之间通信。


为什么会产生消息队列?有几个原因:


不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;


不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;


二. 常用的消息队列

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq


三. 使用场景

异步处理


应用解耦


流量削峰


四 使用amqplib操作RabbitMQ

安装 amqplib


npm install amqplib

生产者:


let amqp = require('amqplib');

 

class RabbitMQ {

    constructor() {

        this.hosts = [];

        this.index = 0;

        this.length = this.hosts.length;

        this.open = amqp.connect(this.hosts[this.index]);

    }

    sendQueueMsg(queueName, msg, errCallBack) {

        let self = this;

 

        self.open

            .then(function (conn) {

                return conn.createChannel();

            })

            .then(function (channel) {

                return channel.assertQueue(queueName).then(function (ok) {

                    return channel.sendToQueue(queueName, new Buffer(msg), {

                        persistent: true

                    });

                })

                    .then(function (data) {

                        if (data) {

                            errCallBack && errCallBack("success");

                            channel.close();

                        }

                    })

                    .catch(function () {

                        setTimeout(() => {

                            if (channel) {

                                channel.close();

                            }

                        }, 500)

                    });

            })

            .catch(function () {

                let num = self.index++;

 

                if (num <= self.length - 1) {

                    self.open = amqp.connect(self.hosts[num]);

                } else {

                    self.index == 0;

                }

            });

    }

}

 

let mq = new RabbitMQ();

mq.sendQueueMsg('testQueue', '123', (error) => {

    console.log(error)

})

消费者


let amqp = require('amqplib');

 

class RabbitMQ {

    constructor() {

        this.hosts = [];

        this.index = 0;

        this.length = this.hosts.length;

        this.open = amqp.connect(this.hosts[this.index]);

    }

 

    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {

        let self = this;

 

        self.open

            .then(function (conn) {

                return conn.createChannel();

            })

            .then(function (channel) {

                return channel.assertQueue(queueName)

                    .then(function (ok) {

                        return channel.consume(queueName, function (msg) {

                            if (msg !== null) {

                                let data = msg.content.toString();

                                channel.ack(msg);

                                receiveCallBack && receiveCallBack(data);

                            }

                        })

                            .finally(function () {

                                setTimeout(() => {

                                    if (channel) {

                                        channel.close();

                                    }

                                }, 500)

                            });

                    })

            })

            .catch(function () {

                let num = self.index++;

                if (num <= self.length - 1) {

                    self.open = amqp.connect(self.hosts[num]);

                } else {

                    self.index = 0;

                    self.open = amqp.connect(self.hosts[0]);

                }

            });

    }

}

 

let mq = new RabbitMQ();

mq.receiveQueueMsg('testQueue',(msg) =>

{

    console.log(msg)//123

})

打开mq后台 http://127.0.0.1:15672/ 看到新增队列,接受一条消息




 


当运行消费者代码时输入 123,消息队列消息为0







  • 2019-11-25 17:05:44

    js实现 throttle 和 debounce,节流,防抖详解

    throttle 节流:drag改变浏览器大小,触发onresize函数,实现拖动每过1秒输出一次,不足1秒,1秒后输出一次。多用于高频操作,如抢票、抢购等,无论点击多少次,只固定间隔执行一次,以减轻压力。debounce防抖:drag改变浏览器大小,触发onresize函数,实现拖动停顿1秒输出。多用于输入框,当某一次输入后停顿满n秒才会去触发远程搜索。

  • 2019-11-25 17:37:01

    百度地图GeoUtils示例

    百度地图JavaScript开源库,是一套基于百度地图API二次开发的开源的代码库。目前提供多个lib库,帮助开发者快速实现在地图上添加Marker、自定义信息窗口、标注相关开发、区域限制设置、几何运算、实时交通、检索与公交驾车查询、鼠标绘制工具等功能。

  • 2019-11-26 11:08:02

    多边型无序点排序(地图绘制多边形)

    任务需求要做一个区域高亮的功能,用到地图,想到了高德地图的多边形API,但是多边形顶点的顺序是要有序的,需求是无序,在API查找无果的情况下,只能手动实现点集合排序。

  • 2019-11-26 11:11:59

    正多边形的编程绘制(javascript)

    如何用程序来绘制正多边形? 在一般情况下,会使用 x = radius * Math.cos(angle), y = radius * Math.sin(angle) 来进行绘制,但这是关于x轴对称的,如果遇到正多边形的边数为奇数,而你又希望它是以y轴对称时,可按照下面的方法。

  • 2019-11-26 13:36:28

    Vue组件命名找不到的问题以及如何给vue组件命名

    首先,Vue 会将 template 中的内容插到 DOM 中,以方便解析标签。由于 HTML 标签不区分大小写,所以在生成的标签名都会转换为小写。例如,当你的 template 为 <MyComponent></MyComponent> 时,插入 DOM 后会被转换为 <mycomponent></mycomponent>。 然后,通过标签名寻找对应的自定义组件。匹配的优先顺序从高到低为:原标签名、camelCase化的标签名、PascalCase化的标签名。例如 <my-component>会依次匹配 my-component、myComponent、MyComponent。camelCase 和 PascalCase 的代码