nodejs队列实现amqplib,rabbitmq

2020-12-07 16:14:22

参考到地址 对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');
var config=require('../config/config');
var log=require('../util/loghelp');
function fail(err, conn) {
    log.error(err);
    if (conn) conn.close();
}
exports.StartConsumer=function (action,qname) {
    function on_connect(err, conn) {
        if (err !== null) return fail(err);
        function on_channel_open(err, ch) {
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                ch.consume(qname, function(msg) {

                    log.info(`Received ${msg.content.toString()},start process`);
                    action(JSON.parse(msg.content))
                        .then(d=> {
                            log.info("mq 处理成功,确认");ch.ack(msg)
                        }
                           )
                        .catch(err=>
                        ch.nack(msg));
                }, {noAck: false} );
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};

exports.enqueue=function (data,qname) {
    function on_connect(err, conn) {
        if (err !== null) return bail(err);

        function on_channel_open(err, ch) {
            if (err !== null) return bail(err, conn);
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                var msg=JSON.stringify(data);
                ch.sendToQueue(qname, new Buffer(msg));
                log.info(`mq send ${msg}`);
                ch.close(function() { conn.close(); });
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};


其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。


  • 2020-02-14 20:08:17

    Android-应用被作为第三方浏览器打开

    微信里的文章页面,可以选择“在浏览器打开”。现在很多应用都内嵌了WebView,那是否可以使自己的应用作为第三方浏览器打开此文章呢?

  • 2020-02-16 16:17:15

    CocoaPods安装和使用教程

    当你开发iOS应用时,会经常使用到很多第三方开源类库,比如JSONKit,AFNetWorking等等。可能某个类库又用到其他类库,所以要使用它,必须得另外下载其他类库,而其他类库又用到其他类库,“子子孙孙无穷尽也”,这也许是比较特殊的情况。总之小编的意思就是,手动一个个去下载所需类库十分麻烦。另外一种常见情况是,你项目中用到的类库有更新,你必须得重新下载新版本,重新加入到项目中,十分麻烦。如果能有什么工具能解决这些恼人的问题,那将“善莫大焉”。所以,你需要 CocoaPods。

  • 2020-02-16 17:13:34

    iOS优秀Objective-C开源库集锦

    自己从事iOS开发工作接近两年左右的时间了,在自己工作之余,收集整理了一些优秀的三方开源框架,自己整理的这些三方开源库涵盖了iOS开发面很多方面的知识。非常感谢这些开源库的作者们,正是因为这些库,提高了我们的开发效率,同样也是我们学习进步的源泉。现将这个整理工程文件分享出来,希望能给需要的朋友一些帮助,同时也自己也做下收集记录。