nodejs操作消息队列RabbitMQ

2021-04-13 09:53:12

参考地址 nodejs操作消息队列RabbitMQ

安装RabbitMQ https://blog.csdn.net/qq_35014708/article/details/93232033


一. 什么是消息队列

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。

其主要用途:不同进程Process/线程Thread之间通信。


为什么会产生消息队列?有几个原因:


不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;


不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;


二. 常用的消息队列

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq


三. 使用场景

异步处理


应用解耦


流量削峰


四 使用amqplib操作RabbitMQ

安装 amqplib


npm install amqplib

生产者:


let amqp = require('amqplib');

 

class RabbitMQ {

    constructor() {

        this.hosts = [];

        this.index = 0;

        this.length = this.hosts.length;

        this.open = amqp.connect(this.hosts[this.index]);

    }

    sendQueueMsg(queueName, msg, errCallBack) {

        let self = this;

 

        self.open

            .then(function (conn) {

                return conn.createChannel();

            })

            .then(function (channel) {

                return channel.assertQueue(queueName).then(function (ok) {

                    return channel.sendToQueue(queueName, new Buffer(msg), {

                        persistent: true

                    });

                })

                    .then(function (data) {

                        if (data) {

                            errCallBack && errCallBack("success");

                            channel.close();

                        }

                    })

                    .catch(function () {

                        setTimeout(() => {

                            if (channel) {

                                channel.close();

                            }

                        }, 500)

                    });

            })

            .catch(function () {

                let num = self.index++;

 

                if (num <= self.length - 1) {

                    self.open = amqp.connect(self.hosts[num]);

                } else {

                    self.index == 0;

                }

            });

    }

}

 

let mq = new RabbitMQ();

mq.sendQueueMsg('testQueue', '123', (error) => {

    console.log(error)

})

消费者


let amqp = require('amqplib');

 

class RabbitMQ {

    constructor() {

        this.hosts = [];

        this.index = 0;

        this.length = this.hosts.length;

        this.open = amqp.connect(this.hosts[this.index]);

    }

 

    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {

        let self = this;

 

        self.open

            .then(function (conn) {

                return conn.createChannel();

            })

            .then(function (channel) {

                return channel.assertQueue(queueName)

                    .then(function (ok) {

                        return channel.consume(queueName, function (msg) {

                            if (msg !== null) {

                                let data = msg.content.toString();

                                channel.ack(msg);

                                receiveCallBack && receiveCallBack(data);

                            }

                        })

                            .finally(function () {

                                setTimeout(() => {

                                    if (channel) {

                                        channel.close();

                                    }

                                }, 500)

                            });

                    })

            })

            .catch(function () {

                let num = self.index++;

                if (num <= self.length - 1) {

                    self.open = amqp.connect(self.hosts[num]);

                } else {

                    self.index = 0;

                    self.open = amqp.connect(self.hosts[0]);

                }

            });

    }

}

 

let mq = new RabbitMQ();

mq.receiveQueueMsg('testQueue',(msg) =>

{

    console.log(msg)//123

})

打开mq后台 http://127.0.0.1:15672/ 看到新增队列,接受一条消息




 


当运行消费者代码时输入 123,消息队列消息为0







  • 2018-08-20 15:26:19

    关于OnTouch 和OnClick同时调用冲突的解决方案

    大家在搞轮播图的时候会碰到这样的情况,点击进入webview界面,长按轮播图停止轮播,手松开图又开始轮播,这里就涉及到了OnTouch 和OnClick同时调用。两者是有冲突的。这里简单介绍,给大家提供思路。

  • 2018-08-20 15:29:11

    揭开RecyclerView的神秘面纱(二):处理RecyclerView的点击事件

    主要讲述了RecyclerView的基本使用方法,不同的布局管理器而造成的多样化展示方式,展示了数据之后,一般都会与用户进行交互,因此我们需要处理用户的点击事件。在ListView和GridView提供了onItemClickListener这个监听器,然而我们查找RecyclerView的API却没有类似的监听器,因此我们需要自己手动处理它的点击事件。 以下提供两种方法来实现处理RecyclerView点击事件的功能,以下代码均基于上一篇文章的代码做出修改。

  • 2018-08-20 22:58:46

    onInterceptTouchEvent和onTouchEvent调用关系详解 ...

    老实说,这两个小东东实在是太麻烦了,很不好懂,我自己那api文档都头晕,在网上找到很多资料,才知道是怎么回事,这里总结一下,记住这个原则就会很清楚了:

  • 2018-08-23 15:32:18

    map对象拷贝问题

    最后面是使用序列化的方式,发现,更改引用类型的数据的时候,mapNew对象并没有发生变化,所以产生了深拷贝。 上述的工具类,可以实现对象的深拷贝,不仅限于HashMap,前提是实现了Serlizeable接口。

  • 2018-08-24 11:33:17

    总结和分析几种判断 RecyclerView 到达底部的方法

    SwipeRefreshLayout 写一个 RecyclerView 的上下拉 ,里面有一个判断 RecyclerView 是否到达底部的方法 isBottom。我的同事用了这个上下拉之后发现有些小 bug,没考虑周全,譬如各个子项高度不统一的时候,然后我找到原因是因为这个判断上下拉的问题。所以,我就去网上查到几种判断 RecyclerView 到达底部的方法,发现各有千秋。以下的分析都以上一篇文章的 SwipeRecyclerView 为例

  • 2018-08-26 00:18:04

    RecyclerView 图片错位空白的问题

    1.图片错位的原因是因为图片异步记载返回去展示出的问题。图片空白,是item刷新,请求图片时间上的问题。 2。viewHolder.setIsRecyclable(false); 就没有tag,不设置 就有tag,但是有没有没啥区别 设置tag,

  • 2018-08-28 10:00:24

    laravel使用队列的简单步骤

    最近需要导入大量的excel文件,数量达到十万之多。 而我又不想修改服务器的超时时间,因为这样可能影响服务器的堵塞。 而php又没有很好的异步。 后来发现了令laravel最为骄傲的部分,队列。具体文档参考下方链接。