本文简单介绍消息队列的使用,以及在使用过程中的注意事项。
项目环境:thinkphp6.0.3 + think-queue3.0
一、think-queue 安装与基本使用
1、使用 composer 安装 thinkphp6 与 think-queue
# 安装 thinkphp6
composer create-project topthink/think 6.0.*
# 安装 think-queue
composer require topthink/think-queue
2、queue 的配置
think-queue 内置了 Redis,Database,Sync等驱动,
这里介绍使用Redis做消息队列存储,
结合supervisor进程管理使队列常驻进程。
配置文件位于 config/queue.php
return [
    // 驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名
    'default'     => 'redis', // Redis驱动
    'connections' => [
        ...
        'redis'    => [
            'type'       => 'redis',      // Redis 驱动
            'queue'      => 'default',    // 默认的队列名称
            'host'       => '127.0.0.1',  // redis 主机ip
            'port'       => 6379,         // redis 端口
            'password'   => '123456',     // redis 密码
            'select'     => 0,            // 使用哪一个 db,默认为 db0
            'timeout'    => 0,            // redis连接的超时时间
            'persistent' => false,        // 是否是长连接
        ],
        ...
    ]
];
3、创建任务类
单模块项目推荐使用 app\job 作为任务类的命名空间 多模块项目可用使用 app\module\job 作为任务类的命名空间 也可以放在任意可以自动加载到的地方
任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个fire方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别
每个方法会传入两个参数 think\queue\Job $job(当前的任务对象) 和 $data(发布任务时自定义的数据)
namespace app\job;
use think\queue\Job;
class MyJob
{
    public function fire(Job $job, $data){
        print("开始执行队列"." \n");
        //....这里执行具体的任务
        if ($job->attempts() > 3) {
            //通过这个方法可以检查这个任务已经重试了几次了
        }
        //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
        $job->delete();
        // 也可以重新发布这个任务
        $job->release($delay); //$delay为延迟时间
    }
    public function failed($data){
        // ...任务达到最大重试次数后,失败了
        print("队列执行失败"." \n");
    }
}
4、发布任务(生产者)
think\facade\Queue::push($job, $data = '', $queue = null)
think\facade\Queue::later($delay, $job, $data = '', $queue = null)
# 两个方法,前者是立即执行,后者是在$delay秒后执行
- $job 是任务名
 单模块的,且命名空间是app\job的,比如上面的例子一,写Job1类名即可
 多模块的,且命名空间是app\module\job的,写model/Job1
 其他的需要些完整的类名,比如上面的例子二,需要写完整的类名app\lib\job\Job2
 如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名,app\lib\job\Job2@task1、app\lib\job\Job2@task2
- $data 是你要传到任务里的参数
- $queue 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填
- namespace app\controller;
 use think\facade\Queue;
 
 class MyQueue extends Base
 {
 public function index($job_name = '', $job_class = '', $job_data = [], $type = 'later', $time = 5) //jio_data 需要发送的数据
 {
 // 当前任务将由哪个类来负责处理。
 $job_class = "app\job\MyJob@fire"; // 任务类 - 执行时调用该类的deal方法
 $job_queue_name = 'Check'; // 队列名称
 empty($job_data) && $job_data = [ 'ts' => time(), 'bizId' => uniqid() , 'data' => $_GET ];
 
 switch ($type) {
 case 'later':
 $is_push = Queue::later($time, $job_class, $job_data, $job_queue_name); // 延迟发送任务 5秒
 break;
 case 'push':
 $is_push = Queue::push($job_class, $job_data, $job_queue_name);   // 立即发送任务
 break;
 }
 // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
 if ($is_push !== false) {
 echo date('Y-m-d H:i:s') . " 新增任务" . "
 ";
 } else {
 echo '新增任务错误';
 }
 }
 }
 - 打开浏览器访问 - http://域名/MyQueue/index,就可以看到任务新增成功
 
 
- 5、处理任务(消费者)- 打开终端切换到当前项目根目录下,执行下面的命令: 
| 命令 | 描述 | 
|---|
 | php think queue:work | 监听队列 |  | php think queue:listen | 监听队列 |  | php think queue:restart | 重启队列 |  | php think queue:subscribe | 暂无,可能是保留的 官方有什么其他想法但是还没实现 |  
 work和listen的区别详细点击(think-queue 解析上)查看,
 两种,具体的可选参数可以输入命令加 --help 查看。
 (注意:使用不同版本的thinkphp时,可选用的参数不同)
 (如:thinkphp6中就取消掉了- --daemon参数,队列默认为循环模式)
 
- php think queue:work --queue Check 
- 二、supervisor的安装和配置- supervisor是用Python开发的一个client/server服务,是Linux/Unix系统下的一个进程管理工具。
 可以很方便的监听、启动、停止、重启一个或多个进程。
 
 - 1、安装- 1.配置好yum源后,可以直接安装 - yum install supervisor
 - 2.Debian/Ubuntu可通过apt安装
 
 
- apt-get install supervisor
 - 3.Python使用pip安装
 
 
- pip install supervisor
 - 2、supervisor的相关配置- 本项目演示使用yum安装:
 打开- supervisor的主配置文件:- /etc/supervisord.conf,找到最后一行
 
- [include]
 ;files = supervisord.d/*.ini
 files = /etc/supervisord.d/*.conf
 - 最后一行,个人习惯命名conf。
 进入子进程配置文件路径:- /etc/supervisord.d/,新建- Check.conf,相关配置说明:
 
 
- # 项目名
 [program:Check]
 # 脚本目录
 directory=/www/.../Check
 # 脚本执行命令
 command=php think queue:work --queue Check
 
 # supervisor启动的时候是否随着同时启动,默认True
 autostart=true
 # 当程序exit的时候,这个program不会自动重启,默认unexpected,设置子进程挂掉后自动重启的情况,有三个选项,false,unexpected和true。如果为false的时候,无论什么情况下,都不会被重新启动,如果为unexpected,只有当进程的退出码不在下面的exitcodes里面定义的
 autorestart=false
 # 这个选项是子进程启动多少秒之后,此时状态如果是running,则我们认为启动成功了。默认值为1
 startsecs=1
 
 # 脚本运行的用户身份
 user=www
 
 # 日志输出
 stderr_logfile=/tmp/Check_stderr.log
 stdout_logfile=/tmp/Check_stdout.log
 # 把stderr重定向到stdout,默认 false
 redirect_stderr = true
 # stdout日志文件大小,默认 50MB
 stdout_logfile_maxbytes = 20M
 # stdout日志文件备份数
 stdout_logfile_backups = 20
 - #说明同上
 [program:Check]
 directory=/www/.../Check
 command=php think queue:work --queue Check --tries 10
 autostart=true
 autorestart=false
 stderr_logfile=/tmp/Check_stderr.log
 stdout_logfile=/tmp/Check_stdout.log
 #user = www
 - 对于单模块而言,不同的业务逻辑为了区分可能会存在多个队列名,这种情况将多个队列名用逗号拼接起来,内容如下:
 
 
- [program:queue]
 user=www
 command=php /www/wwwroot/tpqueue/think queue:work --queue Check,Check1,Check2
 - 3、supervisor相关命令- 启动 supervisor 
- supervisorctl -c /etc/supervisord.conf
 - 上面这个命令会进入 supervisorctl 的 shell 界面,然后可以执行不同的命令了:
 
 
- supervisorctl restart    // 重启指定应用
 supervisorctl stop       // 停止指定应用
 supervisorctl start      // 启动指定应用
 supervisorctl restart all   // 重启所有应用
 supervisorctl stop all      // 停止所有应用
 supervisorctl start all     // 启动所有应用
 supervisorctl status        //查看所有进程的状态
 supervisorctl update        //配置文件修改后使用该命令加载新的配置
 supervisorctl reload        //重新启动配置中的所有程序
 - 若是centos7,加入开机启动:
 
 
- systemctl start supervisord //启动supervisor并加载默认配置文件
 systemctl enable supervisord //将supervisor加入开机启动项
 
- 执行命令来验证是否为开机启动
 
 
- systemctl is-enabled supervisord
 - 注意事项:- 1、在消息对列表执行过程中,当消费者执行队列出现问题时(达到失败后最大尝试次数),将会自动化清除当前队列。 
- 原文链接:https://fengkui.net/article/137