未加星标

Multi Threaded Queue Processing

字体大小 | |
[开发(php) 所属分类 开发(php) | 发布者 店小二03 | 时间 2017 | 作者 红领巾 ] 0人收藏点击收藏

I needed a way to run a bunch of jobs in parallel and I am use to using the Symfony Process Component for that.

So this post will show how I did this in Laravel 4.3 via the Queue so I can remember next time.

What will come out of this is a workflow as seen here


Multi Threaded Queue Processing

We will create two Artisan Commands. This allows us to use Symfony Process to run numerous console commands in the background.

I will not go over how to add a job to the queue there is plenty on that. I will show the class I used for the Queue and it's attributes.

{ "job":"Foo\\\\Reporting\\\\RequestsCommandQueueRunner", "data":{ "results_uuid":"4a13aaee-4289-4f95-afe5-ea5abaaed869" } }

This gets pushed into the Queue. It is the "Parent" that will run x number of children commands. The x is defined in my .env.local.php or .env.production.php files. This way I can increase or decrease the number of threads it will run as needed.

The results_uuid is the data I need to get the children. In this case the children have that uuid in their table as belongsTo that parent.

RequestsCommandQueueRunner

This Artisan command is quite simple

<php #app/Foo/RequestsCommandQueueRunner.php namespace Foo\Reporting; use Foo\Models\ReportingOverviewBatchRequestJob; use Foo\QueueServices\QueueInterface; class RequestsCommandQueueRunner implements QueueInterface { protected $job_limit; protected $results_uuid; public function __construct() { $this->job_limit = $_ENV['RESULTS_QUEUE_LIMIT']; } public function fire($job, $params) { $this->results_uuid = $params['results_uuid']; exec("php artisan request:trigger $this->results_uuid", $results); $job->delete(); //@TODO make delete if all goes well catch error though and fail it } }

All it does is passes the parent id to the trigger command. Below is the trigger command.

Note too I am not using the job limit yet. Later I will make x number of Queue jobs as I divide the number of children that parent has by the limit set.

RequestsQueueCommand <?php namespace Foo\Console; #app/Foo/Console/RequestsQueueCommand.php use Foo\Models\ReportingOverviewBatchRequestJob; use Illuminate\Console\Command; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Process\Process; class RequestsQueueCommand extends Command { protected $name = 'request:trigger'; protected $run = []; protected $description = 'Trigger the running of related jobs'; protected $request_uuid; protected $projects; /** * @var \Foo\Models\ReportingOverviewBatchRequestJob */ private $reporting_overview_batch_request_jobs; public function __construct( ReportingOverviewBatchRequestJob $reporting_overview_batch_request_jobs ) { parent::__construct(); $this->reporting_overview_batch_request_jobs = $reporting_overview_batch_request_jobs; } public function fire() { $this->request_uuid = $this->argument('request_uuid'); $this->setRelatedProjects(); $this->setRun(); $this->runRelatedProjectsCommand(); return "Fired"; } protected function getArguments() { return array( array('request_uuid', InputArgument::REQUIRED, 'The uuid of the Request to trigger the jobs for'), ); } protected function setRelatedProjects() { $projects = $this->reporting_overview_batch_request_jobs->getAllJobsForReportOverviewBatchRequestUuid($this->request_uuid); $this->projects = $projects->toArray(); return $this->projects; } protected function setRun() { foreach($this->projects as $project) { $this->addToRun($project['id']); } } protected function runRelatedProjectsCommand() { foreach($this->run as $process) { $process->start(); } while(count($this->run) > 0) { foreach($this->run as $key => $process) { if(!$process->isRunning()) { \Log::info("Done running process"); unset($this->run[$key]); } } } } protected function addToRun($project_id) { $this->run[] = new Process("php artisan request:run $project_id"); } }

Here is where things get interesting. I begin the work of finding the children Ids then making an array of console commands to run using the children id as the argument I pass.

The Process Component allows me to start them in the background and then check on them while they are running.

Now for the command it runs "php artisan request:run $project_id"

RequestsJobRunCommand

This is the process that does the long running work. In this case logs into Github, get some info and reports on it.

<?php namespace Foo\Console; use Foo\Models\ReportingOverviewBatchRequestJob; use Foo\Services\RequestJobRunService; use Illuminate\Console\Command; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputArgument; class RequestsJobRunCommand extends Command { protected $name = 'request:run'; protected $description = 'Run the related job'; protected $job_uuid; /** * @var RequestJobRunService */ private $requestJobRunService; /** * Id of the job we will run * @var */ protected $related_job; public function __construct(RequestJobRunService $requestJobRunService) { parent::__construct(); $this->requestJobRunService = $requestJobRunService; } public function fire() { $this->job_uuid = $this->argument('job_uuid'); $this->loadTheJobsInfo(); $this->requestJobRunService->getGithubApi()->authenticate(); $this->getJobsAndCount(); return "Fired"; } protected function getJobsAndCount() { try { $tests = $this->getRelatedTests(); $tests_count = count($tests); \Log::info(sprintf("Updating ReportingOverviewRequest with uuid %s", $this->job_uuid)); $this->related_job->status = 1; $this->related_job->number_of_tests = $tests_count; $this->related_job->save(); } catch(\Exception $e) { $this->related_job->status = 1; // mark done for now though a fail noted in message box $this->related_job->message = $e->getMessage(); $this->related_job->save(); \Log::error(sprintf("Error processing job %s for request job request parent uuid %s error ", $this->related_job->id, $this->related_job->reporting_overview_batch_request_id, $e->getMessage())); } } protected function getRelatedTests() { try { return $this->requestJobRunService->getGithubApi() ->setReponame($this->related_job->repo_name) ->setAccountName($this->related_job->account_name) ->setBranch($this->related_job->branch) ->index($this->related_job->folder); } catch(\Exception $e) { throw new \Exception("Error getting index of github " . $e->getMessage()); } } protected function loadTheJobsInfo() { $this->related_job = $this->requestJobRunService->getReportingOverviewBatchRepository()->getReportingOverviewBatchRequestJobsModelInterface() ->findOrFail($this->job_uuid); } protected function getArguments() { return array( array('job_uuid', InputArgument::REQUIRED, 'The uuid of the related request job to run'), ); } }

There still is some work to do. I have to set the limit so my initial run does not timeout. I also need to clean up some code so that these classes are digging so deep into the other classes.

But this was just to show how to use Symfony Process Component, Laravel Queue, and Laravel Artisan to run a multi threaded task.

本文开发(php)相关术语:php代码审计工具 php开发工程师 移动开发者大会 移动互联网开发 web开发工程师 软件开发流程 软件开发工程师

主题: GitLaravelSU
分页:12
转载请注明
本文标题:Multi Threaded Queue Processing
本站链接:http://www.codesec.net/view/533563.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 开发(php) | 评论(0) | 阅读(65)