nodejs队列实现amqplib,rabbitmq

2020-12-07 16:14:22

参考到地址 对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');
var config=require('../config/config');
var log=require('../util/loghelp');
function fail(err, conn) {
    log.error(err);
    if (conn) conn.close();
}
exports.StartConsumer=function (action,qname) {
    function on_connect(err, conn) {
        if (err !== null) return fail(err);
        function on_channel_open(err, ch) {
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                ch.consume(qname, function(msg) {

                    log.info(`Received ${msg.content.toString()},start process`);
                    action(JSON.parse(msg.content))
                        .then(d=> {
                            log.info("mq 处理成功,确认");ch.ack(msg)
                        }
                           )
                        .catch(err=>
                        ch.nack(msg));
                }, {noAck: false} );
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};

exports.enqueue=function (data,qname) {
    function on_connect(err, conn) {
        if (err !== null) return bail(err);

        function on_channel_open(err, ch) {
            if (err !== null) return bail(err, conn);
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                var msg=JSON.stringify(data);
                ch.sendToQueue(qname, new Buffer(msg));
                log.info(`mq send ${msg}`);
                ch.close(function() { conn.close(); });
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};


其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。


  • 2018-09-09 02:25:09

    单例模式的好处和缺点?为什么要用单例模式?

    单例模式是一种常用的软件设计模式。在它的核心结构中只包含一个被称为单例类的特殊类。通过单例模式可以保证系统中一个类只有一个实例而且该实例易于外界访问,从而方便对实例个数的控制并节约系统资源。如果希望在系统中某个类的对象只能存在一个,单例模式是最好的解决方案。

  • 2018-09-09 02:31:48

    基于VCamera,仿微信录制短视频

    基于VCamera,Android仿微信录制短视频,如果喜欢请star,如果觉得有纰漏请提交issue,如果你有更好的点子可以提交pull request。

  • 2018-09-13 22:24:04

    QQ微信登录失败,报100044错

    我用的mob的maven集成方案,说实在的从一开始用maven集成方案就是一个坑啊。每次build都会重新加载所有maven包,后来通过gradle offline总算解决了。

  • 2018-09-26 15:14:23

    PHP JSON_ENCODE 不转义中文汉字的方法

    PHP 生成JSON的时候,必须将汉字不转义为 \u开头的UNICODE数据。 网上很多,但是其实都是错误的,正确的方法是在json_encode 中加入一个参数 JSON_UNESCAPED_UNICODE

  • 2018-09-27 10:04:11

    jquery ajax超时设置

    原来ajax可以设置超时时间,那么简单,ajax还有更多功能,虽然不怎么用它,有时候还挺好用。