nodejs队列实现amqplib,rabbitmq

2020-12-07 16:14:22

参考到地址 对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');
var config=require('../config/config');
var log=require('../util/loghelp');
function fail(err, conn) {
    log.error(err);
    if (conn) conn.close();
}
exports.StartConsumer=function (action,qname) {
    function on_connect(err, conn) {
        if (err !== null) return fail(err);
        function on_channel_open(err, ch) {
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                ch.consume(qname, function(msg) {

                    log.info(`Received ${msg.content.toString()},start process`);
                    action(JSON.parse(msg.content))
                        .then(d=> {
                            log.info("mq 处理成功,确认");ch.ack(msg)
                        }
                           )
                        .catch(err=>
                        ch.nack(msg));
                }, {noAck: false} );
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};

exports.enqueue=function (data,qname) {
    function on_connect(err, conn) {
        if (err !== null) return bail(err);

        function on_channel_open(err, ch) {
            if (err !== null) return bail(err, conn);
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                var msg=JSON.stringify(data);
                ch.sendToQueue(qname, new Buffer(msg));
                log.info(`mq send ${msg}`);
                ch.close(function() { conn.close(); });
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};


其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。


  • 2017-04-26 16:43:03

    php对象和数组相互转换的方法

    这篇文章主要介绍了php对象和数组相互转换的方法,通过两个自定义函数实现对象与数组的相互转换功能,非常简单实用,需要的朋友可以参考下

  • 2017-04-26 22:59:15

    百度编辑器Ueditor的黑白名单过滤

    黑白名单配置。UEditor针对进入编辑器的富文本内容提供了节点级别的过滤,可以通过该配置的修改来达到控制富文本内容的目的

  • 2017-04-26 23:30:58

    PHP中session变量的销毁

    本篇文章主要是对PHP中session变量的销毁进行了介绍,需要的朋友可以过来参考下,希望对大家有所帮助

  • 2017-05-02 17:51:44

    php生成不重复随机字符串

    使用时间戳作为原始字符串,再随机生成五个字符随机插入任意位置,生成新的字符串,保证不重复

  • 2017-05-02 17:54:57

    高并发 php uniqid 不重复唯一标识符生成方案

    PHP uniqid()函数可用于生成不重复的唯一标识符,该函数基于微秒级当前时间戳。在高并发或者间隔时长极短(如循环代码)的情况下,会出现大量重复数据。即使使用了第二个参数,也会重复,最好的方案是结合md5函数来生成唯一ID。

  • 2017-05-12 16:33:24

    说说JSON和JSONP,也许你会豁然开朗

      JSON(JavaScript Object Notation)和JSONP(JSON with Padding)虽然只有一个字母的差别,但其实他们根本不是一回事儿:JSON是一种数据交换格式,而JSONP是一种依靠开发人员的聪明才智创造出的一种非官方跨域数据交互协议。我们拿最近比较火的谍战片来打个比方,JSON是地下党们用来书写和交换情报的“暗号”,而JSONP则是把用暗号书写的情报传递给自己同志时使用的接头方式。看到没?一个是描述信息的格式,一个是信息传递双方约定的方法。