POSTD PRODUCED BY NIJIBOX

POSTD PRODUCED BY NIJIBOX

ニジボックスが運営する
エンジニアに向けた
キュレーションメディア

POSTD PRODUCED BY NIJIBOX

POSTD PRODUCED BY NIJIBOX

ニジボックスが運営する
エンジニアに向けた
キュレーションメディア

FeedlyRSSTwitterFacebook
Maxime Fabre

本記事は、原著者の許諾のもとに翻訳・掲載しております。



PHP pcntl_fork 

ReactPHP使PHP 

 pthreads POSIX Threads2012使使

pthreads





PHPPHP

pthreads


PHPHomebrew使

brew install php56-pthreads

pthreadsPHP --with-thread-safety  ZTSZend Thread Safety PHP   php --version 


mcryptPHP


brew reinstall php56-mcrypt BlackfireZTS


pthreads 3


Threaded object 

Worker 

Pool   


 


 Threaded  Threaded 使便



docblock


Web Threaded  Thread  Threaded  run Google file_get_contents 
<?php
class SearchGoogle extends Thread
{
    public function __construct($query)
    {
        $this->query = $query;
    }

    public function run()
    {
        $this->html = file_get_contents('http://google.fr?q='.$this->query);
    }
}

cats start 
<?php
$job = new SearchGoogle('cats');
$job->start();

 $job->isRunning()  join 使




join 
<?php
$job = new SearchGoogle('cats');
$job->start();

// Wait for the job to be finished and print results
$job->join();
echo $job->html;

これを踏まえ、例ごとに複数の検索を同時に開始して結果を取得することができます。

<?php
$searches = ['cats', 'dogs', 'birds'];
foreach ($searches as &$search) {
    $search = new SearchGoogle($search);
    $search->start();
}

foreach ($searches as $search) {
    $search->join();
    echo substr($search->html, 0, 20);
}

jobクラス内でタイムスタンプを出力する場合は以下のようになります。

public function run()
{
    echo microtime(true).PHP_EOL;

    $this->html = file_get_contents('http://google.fr?q='.$this->query);
}

ファイルを実行すれば、3つのジョブ全てが確かに同時に開始していることを確認できるでしょう。

$ php multiple.php
1446987102.4479
1446987102.4503
1446987102.4525

<!doctype html><html
<!doctype html><html
<!doctype html><html

ワーカーによるジョブ管理

ここまでは比較的簡単でしたが、ジョブを自分で管理せずに済むのが理想でしょう。ジョブをそれぞれ個別に開始し、1つずつ結合するようなことはしたくないと思います。ジョブをどこかへ投げさえすれば、勝手に処理され、処理が全て完了した時に結果を取得できたらいいと思っていることでしょう。そこでワーカーの出番です。

Workerは、その上にジョブをスタックすることができるクラスで、全ジョブを一度に開始し、結合します。

<?php
class Searcher extends Worker
{
    public function run()
    {
        echo 'Running '.$this->getStacked().' jobs'.PHP_EOL;
    }
}

// Stack our jobs on our worker
$worker   = new Searcher();
$searches = ['dogs', 'cats', 'birds'];
foreach ($searches as &$search) {
    $search = new SearchGoogle($search);
    $worker->stack($search);
}

// Start all jobs
$worker->start();

// Join all jobs and close worker
$worker->shutdown();

この出力結果は次のようになります。

Running 3 jobs
1446989108.5938
1446989108.6898
1446989108.9086

結果の収集


 $this->worker HTML
<?php
class Searcher extends Worker
{
    public $data = [];

    public function run()
    {
        echo 'Running '.$this->getStacked().' jobs'.PHP_EOL;
    }

    /**
     * To avoid corrupting the array
     * we use array_merge here instead of just
     * $this->data[] = $html
     */
    public function addData($data)
    {
        $this->data = array_merge($this->data, [$data]);
    }
}

class SearchGoogle extends Threaded
{
    public function __construct($query)
    {
        $this->query = $query;
    }

    public function run()
    {
        echo microtime(true).PHP_EOL;

        $this->worker->addData(
            file_get_contents('http://google.fr?q='.$this->query)
        );
    }
}

// Stack our jobs on our worker
$worker   = new Searcher();
$searches = ['dogs', 'cats', 'birds'];
foreach ($searches as &$search) {
    $search = new SearchGoogle($search);
    $worker->stack($search);
}

// Start all jobs
$worker->start();

// Join all jobs and close worker
$worker->shutdown();
foreach ($worker->data as $html) {
    echo substr($html, 0, 20).PHP_EOL;
}

これを実行すれば、以下のような適切な結果を取得できます。

Running 3 jobs
1446989506.0509
1446989507.938
1446989510.4684
<!doctype html><html
<!doctype html><html
<!doctype html><html

 $worker->stack(new SearchGoogle($search)) Pool


Pool 1


pthreads


3


使

使




poolworker使
<?php
$pool = new Pool(5, Worker::class);

それから $pool->submit(<Threaded>) を呼び出せば、ジョブをプールにサブミットできます。ワーカーとの主な違いは、ジョブをプールにサブミットするとすぐにジョブが開始するということです。もう参照に対処する必要はありません。

<?php
// Create a pool and submit jobs to it
$pool = new Pool(5, Worker::class);
$pool->submit(new SearchGoogle('cats'));
$pool->submit(new SearchGoogle('dogs'));
$pool->submit(new SearchGoogle('birds'));

// Close the pool once done
$pool->shutdown();

便 collect 

pthreads Collectable  Threaded 2 setGarbage  isGarbage Pool
<?php
class SearchPool extends Pool
{
    public $data = [];

    public function process()
    {
        // Run this loop as long as we have
        // jobs in the pool
        while (count($this->work)) {
            $this->collect(function (SearchGoogle $job) {
                // If a job was marked as done
                // collect its results
                if ($job->isGarbage()) {
                    $this->data[$job->query] = $job->html;
                }

                return $job->isGarbage();
            });
        }

        // All jobs are done
        // we can shutdown the pool
        $this->shutdown();

        return $this->data;
    }
}

また、これはつまり、 SearchGoogle ジョブを編集して Collectable を拡張し、 setGarbage を実行されたメソッドの最後で呼び出す必要があるということです。

<?php
class SearchGoogle extends Collectable
{
    public function __construct($query)
    {
        $this->query = $query;
    }

    public function run()
    {
        echo microtime(true).PHP_EOL;

        $this->html = file_get_contents('http://google.fr?q='.$this->query);
        $this->setGarbage();
    }
}

これで次のようなことが行えます。

<?php
// Create a pool and submit jobs to it
$pool = new SearchPool(5, Worker::class);
$pool->submit(new SearchGoogle('cats'));
$pool->submit(new SearchGoogle('dogs'));
$pool->submit(new SearchGoogle('birds'));
$pool->submit(new SearchGoogle('planes'));
$pool->submit(new SearchGoogle('cars'));

$data = $pool->process();
var_dump($data);

これを実行すれば、5つのジョブが同時に実行されたことだけでなく、それらのジョブが同時に完了し、簡単にジョブの結果が得られたことも分かります。

$ time php pooling.php
1446990493.7183
1446990493.7205
1446990493.7227
1446990493.7247
1446990493.7266
array(5) {
  'birds' => (53230) "<!doctype html>"...
  'cats' => (53211) "<!doctype html>"...
  'cars' => (53250) "<!doctype html>"...
  'dogs' => (53244) "<!doctype html>"...
  'planes' => (53267) "<!doctype html>"...
}
php pooling.php  0.51s user 0.03s system 100% cpu 0.940 total
監修者
監修者_古川陽介
古川陽介
株式会社リクルート プロダクト統括本部 プロダクト開発統括室 グループマネジャー 株式会社ニジボックス デベロップメント室 室長 Node.js 日本ユーザーグループ代表
複合機メーカー、ゲーム会社を経て、2016年に株式会社リクルートテクノロジーズ(現リクルート)入社。 現在はAPソリューショングループのマネジャーとしてアプリ基盤の改善や運用、各種開発支援ツールの開発、またテックリードとしてエンジニアチームの支援や育成までを担う。 2019年より株式会社ニジボックスを兼務し、室長としてエンジニア育成基盤の設計、技術指南も遂行。 Node.js 日本ユーザーグループの代表を務め、Node学園祭などを主宰。