nodejs队列实现amqplib,rabbitmq

2020-12-07 16:14:22

参考到地址 对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');
var config=require('../config/config');
var log=require('../util/loghelp');
function fail(err, conn) {
    log.error(err);
    if (conn) conn.close();
}
exports.StartConsumer=function (action,qname) {
    function on_connect(err, conn) {
        if (err !== null) return fail(err);
        function on_channel_open(err, ch) {
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                ch.consume(qname, function(msg) {

                    log.info(`Received ${msg.content.toString()},start process`);
                    action(JSON.parse(msg.content))
                        .then(d=> {
                            log.info("mq 处理成功,确认");ch.ack(msg)
                        }
                           )
                        .catch(err=>
                        ch.nack(msg));
                }, {noAck: false} );
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};

exports.enqueue=function (data,qname) {
    function on_connect(err, conn) {
        if (err !== null) return bail(err);

        function on_channel_open(err, ch) {
            if (err !== null) return bail(err, conn);
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                var msg=JSON.stringify(data);
                ch.sendToQueue(qname, new Buffer(msg));
                log.info(`mq send ${msg}`);
                ch.close(function() { conn.close(); });
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};


其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。


  • 2019-12-27 08:40:55

    align-self和align-items的区别

    align-items在伸缩容器上使用它,伸缩容器内部所有的元素都一致地受制于align-items的值。 但是有些时候,我们希望伸缩容器内部某个元素在侧轴上的排列方式有所差异。此时就不能使用 align-items,因为align-items作用于整体。我们希望作用于部分。这就是align-self的发挥场地。

  • 2019-12-29 15:01:37

    修改laravel分页的样式

    首先获取到数据,paginate方法 能够自动判定当前页面正确的数量限制和偏移数。默认情况下,当前页数由HTTP 请求所带的 ?page 参数来决定。当然,该值由 Laravel 自动检测,并自动插入由分页器生成的链接。

  • 2019-12-29 15:05:57

    php 数组分页 array_slice()函数用法

    今天用到一个函数,非常好用,分享给大家 array_slice() -从数组中取出一段 也就是说用这个函数可以和sql语句一样实现分页,原理是将查询出的数组,取出从指定下标开始到指定长度的数组

  • 2019-12-30 10:17:21

    router-link传递参数,query

    在vue-router中,有两大对象被挂载到了实例this; $route(只读、具备信息的对象); $router(具备功能的函数) 查询字符串: 去哪里 ? <router-link :to="{name:'detail',query:{id:1}}"> xxx </router-link>

  • 2019-12-30 16:48:41

    vue provide/inject详解和用法

    父子组件交互方式多种,props、vuex、 、 emit、localStorage还有就是这个provide/inject了。它适合层级比较深的组件,比如子,子孙,子孙后代的组件有好几个用到父组件的某个属性,就可以用到这个provide/inject,它可以避免写大量繁琐的传值代码 我这里为什么要使用它? 我一个知识库详情父组件中包含了大量的子组件,每个子组件都需要父组件的知识库ID,这时候我不想写大量props,就用到provide/inject进行传值了