nodejs队列实现amqplib,rabbitmq

2020-12-07 16:14:22

参考到地址 对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');
var config=require('../config/config');
var log=require('../util/loghelp');
function fail(err, conn) {
    log.error(err);
    if (conn) conn.close();
}
exports.StartConsumer=function (action,qname) {
    function on_connect(err, conn) {
        if (err !== null) return fail(err);
        function on_channel_open(err, ch) {
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                ch.consume(qname, function(msg) {

                    log.info(`Received ${msg.content.toString()},start process`);
                    action(JSON.parse(msg.content))
                        .then(d=> {
                            log.info("mq 处理成功,确认");ch.ack(msg)
                        }
                           )
                        .catch(err=>
                        ch.nack(msg));
                }, {noAck: false} );
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};

exports.enqueue=function (data,qname) {
    function on_connect(err, conn) {
        if (err !== null) return bail(err);

        function on_channel_open(err, ch) {
            if (err !== null) return bail(err, conn);
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                var msg=JSON.stringify(data);
                ch.sendToQueue(qname, new Buffer(msg));
                log.info(`mq send ${msg}`);
                ch.close(function() { conn.close(); });
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};


其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。


  • 2018-04-03 10:21:35

    jquery实时监听输入框值变化

    在做web开发时候很多时候都需要即时监听输入框值的变化,以便作出即时动作去引导浏览者增强网站的用户体验感。而采用onchange时间又往往是在输入框失去焦点(onblur)时候触发,有时候并不能满足条件。

  • 2018-04-03 10:22:20

    JQuery如何监听DIV内容变化

    这几天在做一个微博的接入,需要判断微博是否被关注,要检查微博标签的DIV是否有“已关注”的字符,但这个DIV的内容是微博JSSDK动态生成。$("#id").html()是获取不到我想要的内容。原因是当我们获取的时候内容还没有改变,所以获取不到,如果就想到监听这个DIV内容变化后,再来获取就个时候就能获取到了。于是产生新的问题,如何监听DIV的变化?

  • 2018-04-04 23:52:03

    PowerManager之PowerManager

    当你在做一些事情时,如果持续时间过长,那么一段时间后屏幕会灭掉,如果你想在你做这些事时屏幕始终保持点亮状态,那么你需要WakeLock的帮助。

  • 2018-04-07 23:35:16

    使用Intent传递对象的两种方式

    Intent 的用法相信你已经比较熟悉了,我们可以借助它来启动活动、发送广播、启动服务等。在进行上述操作的时候,我们还可以在Intent 中添加一些附加数据,以达到传值的效果,比如在FirstActivity 中添加如下代码:

  • 2018-04-10 14:59:59

    JS实现数组去重方法总结(六种方法)

    这篇文章给大家总结下JS实现数组去重方法(六种方法),面试中也经常会遇到这个问题。文中给大家引申的还有合并数组并去重的方法,感兴趣的朋友跟随脚本之家小编一起学习吧

  • 2018-04-13 17:28:33

    jsoup 使用总结4--高级用法之 script js 脚本

    大部分时候,我们使用jsoup解析网页的时候,都是直接找到某一类元素,或者按某种selector查询;具体使用方法可以参考jsoup官网文档 那么你有没有实际操作过,查找script js 脚本呢,因为很多时候页面的内容是根据js动态生成的,或者数据是动态变更;那么这个时候,我们只是获取html页面中script js脚本之间的内容。