nodejs队列实现amqplib,rabbitmq

2020-12-07 16:14:22

参考到地址 对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');
var config=require('../config/config');
var log=require('../util/loghelp');
function fail(err, conn) {
    log.error(err);
    if (conn) conn.close();
}
exports.StartConsumer=function (action,qname) {
    function on_connect(err, conn) {
        if (err !== null) return fail(err);
        function on_channel_open(err, ch) {
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                ch.consume(qname, function(msg) {

                    log.info(`Received ${msg.content.toString()},start process`);
                    action(JSON.parse(msg.content))
                        .then(d=> {
                            log.info("mq 处理成功,确认");ch.ack(msg)
                        }
                           )
                        .catch(err=>
                        ch.nack(msg));
                }, {noAck: false} );
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};

exports.enqueue=function (data,qname) {
    function on_connect(err, conn) {
        if (err !== null) return bail(err);

        function on_channel_open(err, ch) {
            if (err !== null) return bail(err, conn);
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                var msg=JSON.stringify(data);
                ch.sendToQueue(qname, new Buffer(msg));
                log.info(`mq send ${msg}`);
                ch.close(function() { conn.close(); });
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};


其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。


  • 2020-03-09 22:11:39

    聊聊真实的 Android TV 开发技术栈

    智能电视越来越普及了,华为说四月发布智能电视跳票了,一加也说今后要布局智能电视,在智能电视方向,小米已经算是先驱了。但是还有不少开发把智能电视简单的理解成手机屏幕的放大,其实这两者并不一样。

  • 2020-03-11 09:43:20

    JavaScript shift() 方法

    shift() 方法用于把数组的第一个元素从其中删除,并返回第一个元素的值。

  • 2020-03-11 09:45:19

    45个优秀的vue开源项目

    在过去一年里,前端开发发展迅速,前端工程师的薪资亦是水涨船高。2019 更是热度不减,而作为近年来尤为热门的前端框架,Vue.js 自是积累了大量关注。本文将为你介绍 2019 年最值得关注的 45 个 Vue.js 开源项目——Let's go!

  • 2020-03-11 18:26:52

    Mac设置ADB

    adb在 ~/Library/Android/sdk/platform-tools文件夹内

  • 2020-03-11 19:40:56

    java.util.zip.ZipException: zip file is empty

    三、总结 出现 java.util.zip.ZipException: zip file is empty错误,表示你本地使用的jar包或者aar包可能为空,你可以检查下文件大小,如果为空,可以替换本地的jar包或者aar包为正常的jar包或者aar包,或者如果官方有相关的资源的话直接使用官方的依赖资源即可。