nodejs操作消息队列RabbitMQ

2021-04-13 09:53:12

参考地址 nodejs操作消息队列RabbitMQ

安装RabbitMQ https://blog.csdn.net/qq_35014708/article/details/93232033


一. 什么是消息队列

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。

其主要用途:不同进程Process/线程Thread之间通信。


为什么会产生消息队列?有几个原因:


不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;


不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;


二. 常用的消息队列

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq


三. 使用场景

异步处理


应用解耦


流量削峰


四 使用amqplib操作RabbitMQ

安装 amqplib


npm install amqplib

生产者:


let amqp = require('amqplib');

 

class RabbitMQ {

    constructor() {

        this.hosts = [];

        this.index = 0;

        this.length = this.hosts.length;

        this.open = amqp.connect(this.hosts[this.index]);

    }

    sendQueueMsg(queueName, msg, errCallBack) {

        let self = this;

 

        self.open

            .then(function (conn) {

                return conn.createChannel();

            })

            .then(function (channel) {

                return channel.assertQueue(queueName).then(function (ok) {

                    return channel.sendToQueue(queueName, new Buffer(msg), {

                        persistent: true

                    });

                })

                    .then(function (data) {

                        if (data) {

                            errCallBack && errCallBack("success");

                            channel.close();

                        }

                    })

                    .catch(function () {

                        setTimeout(() => {

                            if (channel) {

                                channel.close();

                            }

                        }, 500)

                    });

            })

            .catch(function () {

                let num = self.index++;

 

                if (num <= self.length - 1) {

                    self.open = amqp.connect(self.hosts[num]);

                } else {

                    self.index == 0;

                }

            });

    }

}

 

let mq = new RabbitMQ();

mq.sendQueueMsg('testQueue', '123', (error) => {

    console.log(error)

})

消费者


let amqp = require('amqplib');

 

class RabbitMQ {

    constructor() {

        this.hosts = [];

        this.index = 0;

        this.length = this.hosts.length;

        this.open = amqp.connect(this.hosts[this.index]);

    }

 

    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {

        let self = this;

 

        self.open

            .then(function (conn) {

                return conn.createChannel();

            })

            .then(function (channel) {

                return channel.assertQueue(queueName)

                    .then(function (ok) {

                        return channel.consume(queueName, function (msg) {

                            if (msg !== null) {

                                let data = msg.content.toString();

                                channel.ack(msg);

                                receiveCallBack && receiveCallBack(data);

                            }

                        })

                            .finally(function () {

                                setTimeout(() => {

                                    if (channel) {

                                        channel.close();

                                    }

                                }, 500)

                            });

                    })

            })

            .catch(function () {

                let num = self.index++;

                if (num <= self.length - 1) {

                    self.open = amqp.connect(self.hosts[num]);

                } else {

                    self.index = 0;

                    self.open = amqp.connect(self.hosts[0]);

                }

            });

    }

}

 

let mq = new RabbitMQ();

mq.receiveQueueMsg('testQueue',(msg) =>

{

    console.log(msg)//123

})

打开mq后台 http://127.0.0.1:15672/ 看到新增队列,接受一条消息




 


当运行消费者代码时输入 123,消息队列消息为0







  • 2019-05-13 14:34:42

    linux系统中清理MySql的日志文件,打印日志文件

    默认情况下mysql会一直保留mysql-bin文件,这样到一定时候,磁盘可能会被撑满,这时候是否可以删除这些文件呢,是否可以安全删除,是个问题。 首先要说明一下,这些文件都是mysql的日志文件,如果不做主从复制的话,基本上是没用的,虽然没用,但是不建议使用rm命令删除,这样有可能会不安全,正确的方法是通过mysql的命令去删除。

  • 2019-05-14 16:47:27

    数据库整理碎片

    最后还是用的ALTER TABLE来修改的,不知道为什么有时候管用,有时候不管用。

  • 2019-05-17 16:27:26

    在vue项目里面使用引入公共方法

    今天早上来到公司,没事看了一下别人的博客,然后试了一下,发现的确是可以的,在此记录一下,方便自己日后查阅。 首先新建一个文件夹:commonFunction ,然后在里面建立 一个文件common.js

  • 2019-05-18 12:37:39

    Android夜间模式的实现方案

    对于一款阅读类的软件,夜间模式是不可缺少的。最初看到这个需求时候觉得无从下手,没有一点头绪。后来通过查阅资料发现Android官方在Support Library 23.2.0中已经加入了夜间主题。也就是只需要通过更换主题便可实现日间模式和夜间模式的切换。下面截取项目实现的夜间模式效果图:

  • 2019-05-18 12:38:41

    android 快速实现夜间模式

    最近项目中遇到了一个问题,夜间模式在8.0以上的手机中不起作用,查看了一下原因,是夜间模式实现方法的问题。分两种情况介绍一下

  • 2019-05-18 12:40:35

    Android夜间模式的几种实现

    通过增加一层遮光罩来实现。效果不是很理想,但是好用,毕竟很多手机都有自己的夜间模式了

  • 2019-05-19 02:25:15

    php使用TCPDF生成PDF文件教程

    orientation属性用来设置文档打印格式是“Portrait”还是“Landscape”。 Landscape为横式打印,Portrait为纵向打印

  • 2019-05-21 11:46:05

    RecyclerView 加动画的坑

    然后加到recyclerView上,我是在adapter上加的。Adapter的holder复用相信大家也都很熟悉了,这个在绘制效率的提高上很重要,也很容易发现一个问题,就是内容混乱的复用。所以常见的处理就是对view加上tag来多次判断,对于visibility之类的设置一定是if...else的写法,光有if是不可以的。