composer 安装 think-queue
 
 
 https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md
 
 
composer require topthink/think-queue=1.1.6
composer require topthink/think-queue 2.0.4
composer require topthink/think-queue
 
判断是否安装成功
 
php think queue:work -h
 

 
配置文件
 
Tp5.0
 文件位置:根目录/config/queue.php
 
return [
    'connector'  => 'Redis',        
    'expire'     => 60,        
    'default'    => 'default',        
    'host'       => '127.0.0.1',    
    'port'       => 6379,        
    'password'   => '',        
    'select'     => 1,        
    'timeout'    => 0,        
    'persistent' => false,        
];
 
Tp5.1.x
 文件位置:根目录/config/queue.php
 
return [
    'connector'  => 'Redis',        
    'expire'     => 60,        
    'default'    => 'default',        
    'host'       => '127.0.0.1',    
    'port'       => 6379,        
    'password'   => '',        
    'select'     => 1,        
    'timeout'    => 0,        
    'persistent' => false,        
];
 
Tp6
 配置文件在统一目录下/config/queue.php
 
<?php
return [
    'default'     => 'redis', 
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 1,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];
 
在项目下新建一个Job目录,编写对应的消费者类
 

 
Tp6
 
<?php
namespace app\job;
use think\facade\Log;
use think\queue\Job;
class CronJob
{
    
    public function fire(Job $job, $data)
    {
        Log::channel('job')->info('一条测试日志');
        if (empty($data)) {
            Log::channel('job')->error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__));
            return;
        }
        
        if (!$this->checkJob($data)) {
            $job->delete();
            Log::channel('job')->record("Job does not need to be executed");
            return;
        }
        
        if ($this->doJob($data)) {
            $job->delete();
            Log::channel('job')->record("job has been down and deleted");
        } else {
            
            if ($job->attempts() > 3) {
                Log::channel('job')->record("job has been retried more that 3 times");
                $job->delete(); 
            }
        }
    }
    
    private function checkJob($data)
    {
        Log::channel('job')->record('验证任务是否需要执行');
        return true;
    }
    
    private function doJob($data)
    {
        
        print_r($data['msg'] ?? '实际业务流程处理');
        Log::channel('job')->record('实际业务流程处理');
        return true;
    }
    function task1(){
        print_r("task 1");
    }
    public function failed($data)
    {
        
        Log::channel('job')->error('任务达到最大重试次数后,失败了');
    }
}
 
 
 上面使用了日志通道 配置文件路径 /config/log.php
 
 
<?php
return [
    
    'default'      => env('log.channel', 'file'),
    
    'level'        => [],
    
    'type_channel'    =>    [],
    
    'close'        => false,
    
    'processor'    => null,
    
    'channels'     => [
        'file' => [
            
            'type'           => 'File',
            
            'path'           => '',
            
            'single'         => false,
            
            'apart_level'    => [],
            
            'max_files'      => 0,
            
            'json'           => false,
            
            'processor'      => null,
            
            'close'          => false,
            
            'format'         => '[%s][%s] %s',
            
            'realtime_write' => false,
        ],
        
        'job'    =>    [
            'type' => 'File',
            'path' => app()->getRuntimePath() . 'pay', 
            'time_format' => 'Y-m-d H:i:s',
            'format' => '[%s][%s]:%s'
        ],
    ],
];
 
控制器编写测试代码
 
<?php
namespace app\api\controller\v1;
use app\job\CronJob;
use think\facade\Queue;
class User
{
    public function index(): string
    {
        return 'v1/user/index2';
    }
    
    public function push(): string
    {
        
        $job = 'app\Job\CronJob'; 
        $queueName = 'cron_job_queue';  
        
        $data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());
        $data['user_id'] = 1;
        $res = Queue::push($job, $data, $queueName);   
        $data['msg'] = 'later Test queue msg,time:' . date('Y-m-d H:i:s', time());
        $res = Queue::later(10, $job, $data, $queueName);   
        $data['msg'] = 'task1---,time:' . date('Y-m-d H:i:s', time());
        $res = Queue::later(30, "app\Job\CronJob@task1", $data, $queueName);   
        if ($res == false) {
            return '消息投递失败';
        } else {
            return '消息投递成功';
        }
    }
}
 
测试
 
简述下work和listen在thinkphp框架中明显区别
 
 
 1、work模式只能跑一条队列数据,如果想持续运行必须加 --daemon 参数;
 2、work模式速度和性能更快,每次执行无需再加载框架所有文件,因此如果想修改队列代码必须重启队列,listen模式每次执行都会重新加载thinkphp文件,相当于用户访问一个接口一样;
 3、work模式因为无需重复加载框架文件,因此也会带来MySQL链接超时的烦恼,目前thinkphp5.x全系版本无法解决此问题,所以如果队列中有操作数据库的不建议使用此模式;
 
 
php think queue:work --queue cron_job_queue
php think queue:listen --daemon --queue helloJobQueue
nohup php think queue:work  --daemon --queue cron_job_queue &
 

 
