composer 安装 think-queue
https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md
composer require topthink/think-queue=1.1.6
composer require topthink/think-queue 2.0.4
composer require topthink/think-queue
判断是否安装成功
php think queue:work -h

配置文件
Tp5.0
文件位置:根目录/config/queue.php
return [
'connector' => 'Redis',
'expire' => 60,
'default' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 1,
'timeout' => 0,
'persistent' => false,
];
Tp5.1.x
文件位置:根目录/config/queue.php
return [
'connector' => 'Redis',
'expire' => 60,
'default' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 1,
'timeout' => 0,
'persistent' => false,
];
Tp6
配置文件在统一目录下/config/queue.php
<?php
return [
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 1,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
在项目下新建一个Job目录,编写对应的消费者类

Tp6
<?php
namespace app\job;
use think\facade\Log;
use think\queue\Job;
class CronJob
{
public function fire(Job $job, $data)
{
Log::channel('job')->info('一条测试日志');
if (empty($data)) {
Log::channel('job')->error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__));
return;
}
if (!$this->checkJob($data)) {
$job->delete();
Log::channel('job')->record("Job does not need to be executed");
return;
}
if ($this->doJob($data)) {
$job->delete();
Log::channel('job')->record("job has been down and deleted");
} else {
if ($job->attempts() > 3) {
Log::channel('job')->record("job has been retried more that 3 times");
$job->delete();
}
}
}
private function checkJob($data)
{
Log::channel('job')->record('验证任务是否需要执行');
return true;
}
private function doJob($data)
{
print_r($data['msg'] ?? '实际业务流程处理');
Log::channel('job')->record('实际业务流程处理');
return true;
}
function task1(){
print_r("task 1");
}
public function failed($data)
{
Log::channel('job')->error('任务达到最大重试次数后,失败了');
}
}
上面使用了日志通道 配置文件路径 /config/log.php
<?php
return [
'default' => env('log.channel', 'file'),
'level' => [],
'type_channel' => [],
'close' => false,
'processor' => null,
'channels' => [
'file' => [
'type' => 'File',
'path' => '',
'single' => false,
'apart_level' => [],
'max_files' => 0,
'json' => false,
'processor' => null,
'close' => false,
'format' => '[%s][%s] %s',
'realtime_write' => false,
],
'job' => [
'type' => 'File',
'path' => app()->getRuntimePath() . 'pay',
'time_format' => 'Y-m-d H:i:s',
'format' => '[%s][%s]:%s'
],
],
];
控制器编写测试代码
<?php
namespace app\api\controller\v1;
use app\job\CronJob;
use think\facade\Queue;
class User
{
public function index(): string
{
return 'v1/user/index2';
}
public function push(): string
{
$job = 'app\Job\CronJob';
$queueName = 'cron_job_queue';
$data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());
$data['user_id'] = 1;
$res = Queue::push($job, $data, $queueName);
$data['msg'] = 'later Test queue msg,time:' . date('Y-m-d H:i:s', time());
$res = Queue::later(10, $job, $data, $queueName);
$data['msg'] = 'task1---,time:' . date('Y-m-d H:i:s', time());
$res = Queue::later(30, "app\Job\CronJob@task1", $data, $queueName);
if ($res == false) {
return '消息投递失败';
} else {
return '消息投递成功';
}
}
}
测试
简述下work和listen在thinkphp框架中明显区别
1、work模式只能跑一条队列数据,如果想持续运行必须加 --daemon 参数;
2、work模式速度和性能更快,每次执行无需再加载框架所有文件,因此如果想修改队列代码必须重启队列,listen模式每次执行都会重新加载thinkphp文件,相当于用户访问一个接口一样;
3、work模式因为无需重复加载框架文件,因此也会带来MySQL链接超时的烦恼,目前thinkphp5.x全系版本无法解决此问题,所以如果队列中有操作数据库的不建议使用此模式;
php think queue:work --queue cron_job_queue
php think queue:listen --daemon --queue helloJobQueue
nohup php think queue:work --daemon --queue cron_job_queue &

