nodejs队列实现amqplib,rabbitmq

2020-12-07 16:14:22

参考到地址 对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');
var config=require('../config/config');
var log=require('../util/loghelp');
function fail(err, conn) {
    log.error(err);
    if (conn) conn.close();
}
exports.StartConsumer=function (action,qname) {
    function on_connect(err, conn) {
        if (err !== null) return fail(err);
        function on_channel_open(err, ch) {
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                ch.consume(qname, function(msg) {

                    log.info(`Received ${msg.content.toString()},start process`);
                    action(JSON.parse(msg.content))
                        .then(d=> {
                            log.info("mq 处理成功,确认");ch.ack(msg)
                        }
                           )
                        .catch(err=>
                        ch.nack(msg));
                }, {noAck: false} );
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};

exports.enqueue=function (data,qname) {
    function on_connect(err, conn) {
        if (err !== null) return bail(err);

        function on_channel_open(err, ch) {
            if (err !== null) return bail(err, conn);
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                var msg=JSON.stringify(data);
                ch.sendToQueue(qname, new Buffer(msg));
                log.info(`mq send ${msg}`);
                ch.close(function() { conn.close(); });
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};


其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。


  • 2019-09-04 16:32:56

    Ubuntu tar 解压缩命令详解

    tar 解压缩命令详解,这五个是独立的命令,压缩解压都要用到其中一个,可以和别的命令连用但只能用其中一个。下面的参数是根据需要在压缩或解压档案时可选的。

  • 2019-09-04 16:50:35

    CMake入门笔记

    Make是一个跨平台的安装(编译)工具,可以用简单的语句来描述所有平台的安装(编译过程)。他能够输出各种各样的makefile或者project文件,能测试编译器所支持的C++特性,类似UNIX下的automake。只是 CMake 的组态档取名为 CMakeLists.txt。Cmake 并不直接建构出最终的软件,而是产生标准的建构档(如 Unix 的 Makefile 或 Windows Visual C++ 的 projects/workspaces),然后再依一般的建构方式使用。这使得熟悉某个集成开发环境(IDE)的开发者可以用标准的方式建构他的软件,这种可以使用各平台的原生建构系统的能力是 CMake 和 SCons 等其他类似系统的区别之处。

  • 2019-09-05 20:51:15

    在Android上使用FFmpeg压缩视频

    libavcodec-提供了更加全面的编解码实现的合集 libavformat-提供了更加全面的音视频容器格式的封装和解析以及所支持的协议 libavutil-提供了一些公共函数 libavfilter-提供音视频的过滤器,如视频加水印、音频变声等 libavdevice-提供支持众多设备数据的输入与输出,如读取摄像头数据、屏幕录制 libswresample,libavresample-提供音频的重采样工具 libswscale-提供对视频图像进行色彩转换、缩放以及像素格式转换,如图像的YUV转换 libpostproc-多媒体后处理器

  • 2019-09-05 20:54:21

    在Android 中使用FFmpeg命令

    到这里就可以运行FFmpeg命令了。一直我也是这样使用,但是我在做这个项目Cut的时候发现连续调用多次FFmpeg命令会报错(在项目需要,先改变分镜头的速度,再合成视频)。 为什么会这样的呢?

  • 2019-09-06 10:30:20

    ffmpeg错误码

    AVERROR_BSF_NOT_FOUND = -1179861752 AVERROR_BUG = -558323010 AVERROR_DECODER_NOT_FOUND = -1128613112 AVERROR_DEMUXER_NOT_FOUND = -1296385272 AVERROR_ENCODER_NOT_FOUND = -1129203192 AVERROR_EOF = -541478725 AVERROR_EXIT = -1414092869 AVERROR_FILTER_NOT_FOUND = -1279870712 AVERROR_INVALIDDATA = -1094995529 AVERROR_MUXER_NOT_FOUND = -1481985528 AVERROR_OPTION_NOT_FOUND = -1414549496 AVERROR_PATCHWELCOME = -1163346256 AVERROR_PROTOCOL_NOT_FOUND = -1330794744 AVERROR_STREAM_NOT_FOUND = -1381258232 AVERROR_BUG2 = -541545794 AVERROR_UNKNOWN = -1313558101

  • 2019-09-08 09:05:54

    MyBatis Generator 插件的拓展插件包

    应该说使用Mybatis就一定离不开MyBatis Generator这款代码生成插件,而这款插件自身还提供了插件拓展功能用于强化插件本身,官方已经提供了一些拓展插件,本项目的目的也是通过该插件机制来强化Mybatis Generator本身,方便和减少我们平时的代码开发量。