nodejs队列实现amqplib,rabbitmq

2020-12-07 16:14:22

参考到地址 对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');
var config=require('../config/config');
var log=require('../util/loghelp');
function fail(err, conn) {
    log.error(err);
    if (conn) conn.close();
}
exports.StartConsumer=function (action,qname) {
    function on_connect(err, conn) {
        if (err !== null) return fail(err);
        function on_channel_open(err, ch) {
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                ch.consume(qname, function(msg) {

                    log.info(`Received ${msg.content.toString()},start process`);
                    action(JSON.parse(msg.content))
                        .then(d=> {
                            log.info("mq 处理成功,确认");ch.ack(msg)
                        }
                           )
                        .catch(err=>
                        ch.nack(msg));
                }, {noAck: false} );
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};

exports.enqueue=function (data,qname) {
    function on_connect(err, conn) {
        if (err !== null) return bail(err);

        function on_channel_open(err, ch) {
            if (err !== null) return bail(err, conn);
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                var msg=JSON.stringify(data);
                ch.sendToQueue(qname, new Buffer(msg));
                log.info(`mq send ${msg}`);
                ch.close(function() { conn.close(); });
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};


其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。


  • 2019-12-20 13:24:58

    瓦片地图生成使用以及原理

    我们都知道地球是圆的,电脑显示器是平的,要想让位于球面的形状显示在平面的显示器上就必然需要一个转换过程,这个过程就叫做投影(Projection)。在地球上我们通过经纬度来描述某个位置,而经过投影之后的地图也有自己的坐标系统,本篇文章就来详细介绍在百度地图API中涉及的各种坐标体系。

  • 2019-12-20 13:27:16

    腾讯地图谷歌和高德地图等自定义地图区别

    腾讯、百度、Google的地图投影均采用Web Mercator 投影坐标系;腾讯与Google的地图瓦片分辨率及切片范围是完全相同的,仅仅是命名规则稍有不同,这就使得同一位置和缩放级别的地图瓦片是完全可以重叠的;而百度地图每个缩放级别分辨率与前两者均不相同,而且地图瓦片的坐标原点做了一定的偏移,导致百度地图与前两者的瓦片是无法重叠的,这是因为百度在GCJ-02的基础上又进行了加密处理,形成了百度独有的BD-09坐标系。

  • 2019-12-22 08:06:48

    如何快速撤销上一次的commit

    在平时工作中使用git难免会提交一些错误的文件到git库里,这时候,撤销吧,怕把正确的文件删除了,不撤销重新改又很麻烦,下面,我就从提交的三个阶段,来讲解如何撤销错误的操作。

  • 2019-12-23 14:54:03

    RPC, REST ,GraphQL区别比较优劣

    其实在使用和学习的过程中,有很多文章都对比过它们的异同,但是大部分文章并没有从一个相对客观的角度来对比,更多是为了突显一个的优点而刻意指出另外一个的缺点。这让我想到一句话,脱离业务情景谈技术就是耍流氓。

  • 2019-12-23 23:38:59

    vue-apollo的多客户端的用法

    vue-apollo的多客户端的用法以及apollo.js的配置 关于如何安装和如何使用,这篇文章就先暂时不介绍了,如果不清楚就看我另一篇关于vue-apollo的用法 在做项目中,有时候后端的接口是按模块功能去划分的,那么请求的地址就会不同,关于vue-apollo的多客户端配置如下