nodejs队列实现amqplib,rabbitmq

2020-12-07 16:14:22

参考到地址 对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');
var config=require('../config/config');
var log=require('../util/loghelp');
function fail(err, conn) {
    log.error(err);
    if (conn) conn.close();
}
exports.StartConsumer=function (action,qname) {
    function on_connect(err, conn) {
        if (err !== null) return fail(err);
        function on_channel_open(err, ch) {
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                ch.consume(qname, function(msg) {

                    log.info(`Received ${msg.content.toString()},start process`);
                    action(JSON.parse(msg.content))
                        .then(d=> {
                            log.info("mq 处理成功,确认");ch.ack(msg)
                        }
                           )
                        .catch(err=>
                        ch.nack(msg));
                }, {noAck: false} );
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};

exports.enqueue=function (data,qname) {
    function on_connect(err, conn) {
        if (err !== null) return bail(err);

        function on_channel_open(err, ch) {
            if (err !== null) return bail(err, conn);
            ch.assertQueue(qname, {durable: true}, function(err, ok) {
                if (err !== null) return bail(err, conn);
                var msg=JSON.stringify(data);
                ch.sendToQueue(qname, new Buffer(msg));
                log.info(`mq send ${msg}`);
                ch.close(function() { conn.close(); });
            });
        }
        conn.createChannel(on_channel_open);
    }
    amqp.connect(config.amqp.url,on_connect);
};


其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。


  • 2017-04-02 00:44:46

    NGINX + PHP-FPM 502 相关事

    NGINX + PHP-FPM 报 502 错误,我想大部分 SA 都遇到过吧。 根据报错的频率,可以分为两种情况,间歇性的502和连续性的502。 这里只讨论第一种情况——间歇性的502。

  • 2017-04-02 00:52:26

    php-fpm占用系统资源分析

    由上图分析,可以看出共有602个进程,其中有601个进程休眠了。这好像有点不对劲,内核进程也就80个左右,加上memcached, nginx, mysqld,也不会超出90个。除了这些,剩下的只有php-fpm管理的php-cgi,难道是…?

  • 2017-04-02 00:56:36

    php-fpm占用系统资源分析

    由上图分析,可以看出共有602个进程,其中有601个进程休眠了。这好像有点不对劲,内核进程也就80个左右,加上memcached, nginx, mysqld,也不会超出90个。除了这些,剩下的只有php-fpm管理的php-cgi,难道是…?

  • 2017-04-03 14:23:17

    Android Studio --“Cannot resolve symbol” 解决办法

    Android Studio 无法识别同一个 package 里的其他类,将其显示为红色,但是 compile 没有问题。鼠标放上去后显示 “Cannot resolve symbol XXX”,重启 Android Studio,重新 sync gradle,Clean build 都没有用。

  • 2017-04-06 14:59:13

    PHP配置文件详解

    PHP是一个简单易学,功能强大的语言,尤其在Web开发,开发效率高,方便快捷。研究一下php.ini了解PHP相关配置会有好处的,对PHP有更加深入的了解。

  • 2017-04-06 15:00:46

    怎么实时查看mysql当前连接数

    今天有一台mysql服务器突然连接数暴增,并且等待进程全部被锁...因为问题解决不当,导致被骂...OTL 总结:以后要快速定位错误,布置解决方案

  • 2017-04-06 15:07:57

    PHP-FPM不完全指南

    fpm启动后会先读php.ini,然后再读相应的conf配置文件,conf配置可以覆盖php.ini的配置。 启动fpm之后,会创建一个master进程,监听9000端口(可配置),master进程又会根据fpm.conf/www.conf去创建若干子进程,子进程用于处理实际的业务。

  • 2017-04-06 15:11:03

    浅析php-fpm 和 mysql 之间的关系详解

    php-fpm 和 mysql 之间的关系估计不做底层开发应用的是不会去考虑分析它们了,如果是的话我们来看一篇关于php-fpm 和 mysql 之间的关系的教程。