nodejs队列实现amqplib,rabbitmq

2020-12-07 16:14:22

参考到地址 对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');
var config=require('../config/config');
var log=require('../util/loghelp');
function fail(err, conn) {
    log.error(err);
    if (conn) conn.close();
}
exports.StartConsumer=function (action,qname) {
    function on_connect(err, conn) {
        if (err !== null) return fail(err);
        function on_channel_open(err, ch) {
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                ch.consume(qname, function(msg) {

                    log.info(`Received ${msg.content.toString()},start process`);
                    action(JSON.parse(msg.content))
                        .then(d=> {
                            log.info("mq 处理成功,确认");ch.ack(msg)
                        }
                           )
                        .catch(err=>
                        ch.nack(msg));
                }, {noAck: false} );
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};

exports.enqueue=function (data,qname) {
    function on_connect(err, conn) {
        if (err !== null) return bail(err);

        function on_channel_open(err, ch) {
            if (err !== null) return bail(err, conn);
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                var msg=JSON.stringify(data);
                ch.sendToQueue(qname, new Buffer(msg));
                log.info(`mq send ${msg}`);
                ch.close(function() { conn.close(); });
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};


其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。


  • 2021-01-22 22:25:51

    How to protect your JS code by WebAssembly

    对于iOS或是Android来说,我们可以将相关的算法通过C/C++进行编写,然后编译为dylib或是so并进行混淆以此来增加破解的复杂度,但是对于前端来说,并没有类似的技术可以使用。当然,自从asm.js及WebAssembly的全面推进后,我们可以使用其进一步增强我们核心代码的安全性,但由于asm.js以及WebAssembly标准的开放,其安全强度也并非想象中的那么美好。

  • 2021-01-24 09:50:16

    UICollectionViewCell cell高度自适应

    本来想使用UICollectionView来作为整体的布局,并且不再使用UITableView,但是发现高度不固定的布局,UICollectionView没啥优势呀,至少我没找到好的方法,从网上看的是,要自定义cell,并且继承preferredLayoutAttributesFittingAttributes

  • 2021-01-24 10:08:57

    ios后台播放视频

    很多开发者以为AVPlayer不能在后台播放视频:应用退到后台,但能播放视频的声音(ps:不是通过切换相同的音频来实现),我在开发SDK的过程中也遇到这个需求,所幸解决了这个问题。下面我就来讲讲实现的过程。

  • 2021-01-24 10:10:22

    XCode提示build successed,无法启动模拟器

    使用phonegap(cordova)创建项目后,开始时使用Xcode6.1或命令行运行项目都可启动模拟器调试;不知道什么原因命令行运行项目完全没有问题,但是Xcode运行,提示build successed,但是始终无法启动模拟器。