告别阻塞!用 PHP TrueAsync 实现 PHP 脚本提速 10 倍
proc_open与shell_exec等函数不同proc_open是创建进程的丰富工具。PHP 核心甚至为它引入了特殊的hack来正确处理管道。管道是进程间通信的最佳方式之一也是最便捷的方式。唯一更好的方案是共享内存加文件事件这仅仅是因为内存区域位于操作系统内核之外。为什么管道方便因为它们假装是文件。可以像普通文件一样用fread读取、fwrite写入。这对开发者来说非常有价值因为代码变得更通用、更可移植。通信协议管道是字节流它不知道消息的概念只是简单地在父子进程间传输字节。为了让代码正常工作需要一种方式来界定消息边界。最简单的格式是 NDJSON每条消息占一行行与行之间用\n分隔。{id:1,value:42} {id:2,value:17} {id:3,value:88}为什么这种格式很合适因为 PHP 对 JSON 支持很好而且 PHP 有fgets函数可以从流中读取行。$line fgets(STDIN); // 读取到 \n 为止 $task json_decode($line); // 解析 JSON下面编写普通同步代码打开进程并通过 NDJSON 协议与之通信function run_worker(string $script, array $tasks): void { $process proc_open( [PHP_BINARY, $script], [ 0 [pipe, r], // 子进程 STDIN 1 [pipe, w], // 子进程 STDOUT 2 STDERR, ], $pipes ); foreach ($tasks as $task) { // 发送任务 fwrite($pipes[0], json_encode($task) . \n); // 等待响应 ← PHP 在这里阻塞直到读取到一行 $line fgets($pipes[1]); $result json_decode(trim($line), true); echo Task #{$result[task_id]}: {$result[result]}\n; } fclose($pipes[0]); // 关闭 STDIN → worker.php 将退出 fclose($pipes[1]); proc_close($process); } run_worker(worker.php, [ [id 1, value 42], [id 2, value 17], [id 3, value 88], ]);现在把这段代码放到协程里不是放一个而是放 10 个。$tasks [ [id 1, value 42], [id 2, value 17], // ... 还有 28 个任务 ]; for ($i 0; $i 10; $i) { spawn(run_worker(...), worker.php, $tasks); }只写了一行异步代码spawn(run_worker(...))但实际上proc_open、fgets和fclose从一开始就变成了非阻塞 I/O 操作。现在十个协程实际上是 11 个因为有主代码竞争 CPU 时间。大部分代码仍保持顺序执行。Channels目前每个协程都收到相同的任务数组这显然不是期望的行为。例如可能想从 Twig 模板生成静态站点的 HTML给定文件列表后让每个文件由独立的 worker 处理或者可能需要解析日志。Workers 相互独立运行因为工作量可能不同而且不知道 worker 是否忙碌。为此编写任务分发层相当复杂也不理想。需要一个快速、便捷的解决方案无需过多思考。为此TrueAsync 提供了 Channel正是为这类任务设计的。Channel 是一个队列用于在协程间交换数据。Channel 中的每条消息可以是一个任务。协程从同一个 Channel 读取并执行任务。无需检查 worker 是否忙碌只需把任务发到 Channelworker 准备好时就会执行。$taskQueue new Async\Channel(10); for ($i 0; $i 10; $i) { spawn(run_worker(...), worker.php, $taskQueue); } foreach ($tasks as $task) { $channel-send($task); } $channel-close();当然这段代码没有错误处理或边界检查但它尽可能简单且易读。然而在底层一些非平凡的事情正在发生。假设有 100 个任务和 10 个进程的池。当所有 worker 都忙碌时会发生什么$channel-send($task)会挂起协程直到队列有空间。队列大小与池大小匹配并非巧合任务供给函数会暂停防止内存被尚无法处理的任务填满。TaskGroup已编写的代码离生产就绪还很远。不仅跳过了错误处理而且没有控制协程。它们悬在空中如果应用开始关闭或出现问题异常无法保证协程不会因逻辑错误而挂起。这很糟糕而且在生产环境中代码挂起却无从排查原因非常令人不快。为最小化意外错误的风险使用 TaskGroup 模式。它专门设计用于一起启动协程、一起等待它们、一起销毁它们。$taskQueue new Async\Channel(10); $group new Async\TaskGroup(); try { // 在组管理下启动 workers for ($i 0; $i 10; $i) { $group-spawn(run_worker(...), worker.php, $taskQueue); } // 向 Channel 投喂任务 foreach ($tasks as $task) { $taskQueue-send($task); } } finally { // 关闭任务 Channel —— 尝试读取的协程 // 会收到 ChannelClosedException 并优雅关闭 $taskQueue-close(); // 封印组防止意外添加新协程 $group-seal(); // 防止添加新协程 // 等待尚未完成的任务 $group-all()-await(); // 等待所有 worker 完成 }增强代码的容错性前一个版本的代码不适合真实场景。如果 worker 进程崩溃fwrite($pipes[0], json_encode($task) . \n)可能失败。而且代码还隐藏着一个隐患。function run_worker(string $script, Channel $taskQueue): void { return; } $taskQueue new Channel(POOL_SIZE); $group new TaskGroup(); try { for ($i 0; $i POOL_SIZE; $i) { $group-spawn(run_worker(...), __DIR__ . /worker.php, $taskQueue); } foreach (generate_tasks(TASK_COUNT) as $task) { $taskQueue-send($task); } } finally { $taskQueue-close(); $group-seal(); $group-all()-await(); echo \nDone.\n; }结果会是死锁错误输出类似这样 DEADLOCK REPORT START Coroutines waiting: 1, active_events: 0 Coroutine 4 spawned at main:0, suspended at main.php:96 (main) waiting for: - Channel(capacity3, receivers0, senders1) DEADLOCK REPORT END 死锁调试输出可通过async.debug_deadlockini 指令控制。详见 https://true-async.github.io/en/docs/reference/ini-settings.html原因是主协程试图向已满的 Channel 发送任务但没有人会去读取。为避免此错误需要在逻辑上将 TaskGroup 的生命周期与$taskQueue绑定。这可以通过TaskGroup::all()-await()等待所有活跃任务来实现。spawn(function () use ($taskQueue, $group) { try { $group-all()-await(); } finally { $taskQueue-close(); } });更简洁的实现方式foreach (generate_tasks(TASK_COUNT) as $task) { $taskQueue-send($task, $group-all()); }Channel::send方法的第二个参数是取消令牌允许在任意条件下取消操作。$group-all()返回一个 Future当所有任务完成时解析。不幸的是即使$taskQueue-send($task, $group-all())也不是 100% 正确因为它没有处理 TaskGroup 中所有协程已完成但新协程尚未启动的情况。Channel 类的语义——在 TrueAsync 0.6.0 中与 Go 非常相似——是最有问题的实现之一值得单独关注。现在改进进程池代码本身的错误处理。foreach ($taskQueue as $task) { $encoded json_encode($task); if ($encoded false) { echo [worker] json_encode failed for task #{$task[id]}: . json_last_error_msg() . \n; return; } if (fwrite($pipes[0], $encoded . \n)) { echo [worker] fwrite failed for task #{$task[id]}: pipe may be broken\n; return; } $line fgets($pipes[1]); if ($line false stream_get_meta_data($pipes[1])[timed_out]) { echo [worker] timeout waiting for response on task #{$task[id]}\n; return; } else if ($line false) { echo [worker] fgets failed for task #{$task[id]}: pipe closed or EOF\n; return; } $result json_decode(trim($line), true); if ($result null) { echo [worker] json_decode failed for task #{$task[id]}: . json_last_error_msg() . (raw: . trim($line) . )\n; return; } }在循环前添加超时设置stream_set_timeout($pipes[1], 5)——五秒——防止父进程失控。故意在 worker_fail.php 中破坏 worker 代码例如添加sleep(10)看看会发生什么php.exe E:\php\examples\workers_process_cli\main2.php [worker] fwrite failed for task #1: pipe may be broken [worker] fwrite failed for task #2: pipe may be broken [worker] fwrite failed for task #3: pipe may be broken Done.现在可以确信即使 worker 崩溃主进程也不会挂起而是继续运行。如果进程池能自动启动新进程以保持总数恒定那就更好了。怎么做可以复杂化协程代码添加显式重启进程的循环。把这个练习留给喜欢复杂代码和分形的爱好者这不是我们的选择因为很容易不小心破坏什么。协程内部的线性、简单和最小分支是它的主要优点。让我们保持这样。既然协程在概念上等同于进程那么在进程交互结束时同时结束协程是合理的。这意味着可以响应协程完成事件。TaskGroup 类有合适的race()方法在协程完成时触发。唯一的问题是 TaskGroup 是幂等的多次调用race()总是返回相同结果。所以需要不同的工具。TaskSet 类非常适合这个需求——它是 TaskGroup 的孪生兄弟非常适合 supervisor 逻辑。大多数方法相同区别在于join*方法组它不仅返回任务结果同时从 TaskSet 中移除它。正是需要的$taskQueue new Channel(POOL_SIZE); $group new TaskSet(); try { for ($i 0; $i POOL_SIZE; $i) { $group-spawn(run_worker(...), __DIR__ . /worker.php, $taskQueue); } foreach (generate_tasks(TASK_COUNT) as $task) { $taskQueue-send($task, $group-joinAll()); } } finally { $taskQueue-close(); $group-seal(); $group-joinAll()-await(); echo \nDone.\n; }现在添加 supervisor 逻辑在进程崩溃时重启$supervisor spawn(function () use ($group, $taskQueue): void { while (true) { try { // 等待至少一个任务完成 $group-joinNext()-await(); } catch (\Exception $exception) { // 发生了真正意想不到的事 echo [supervisor] Exception: {$exception-getMessage()}\n; $group-seal(); $taskQueue-close(); return; } // 如果组被封印或 Channel 已关闭退出循环 —— // 进程被有意停止不应重启 if($group-isSealed() || $taskQueue-isClosed()) { return; } $group-spawn(run_worker(...), __DIR__ . /worker.php, $taskQueue); } });用 worker_fail.php 运行代码观察到尽管 worker 崩溃进程池仍在运行php.exe E:\php\examples\workers_process_cli\main2.php [worker] fwrite failed for task #1: pipe may be broken [worker] fwrite failed for task #2: pipe may be broken条目数应该等于 TASK_COUNT尽管没有任务被成功执行。太好了。supervisor 代码对任务如何执行一无所知。执行任务的协程对 supervisor 一无所知。这不仅产生了相当可读的代码而且产生了低耦合的代码易于维护。稍微改进 supervisor让它对重启不要过于宽容如果有无限多的任务且 worker.php 有 bug可能进入无限重启循环。$supervisor spawn(function () use ($group, $taskQueue): void { $cooldown 5; $threshold 2; $restarts 0; $lastFail time(); while (true) { try { $group-joinNext()-await(); } catch (\Exception $e) { echo [supervisor] Exception: {$e-getMessage()}\n; $group-seal(); $taskQueue-close(); return; } if ($group-isSealed() || $taskQueue-isClosed()) { return; } $now time(); if (($now - $lastFail) $cooldown) { if ($restarts $threshold) { echo [supervisor] Too many worker failures, shutting down pool.\n; $group-seal(); $taskQueue-close(); return; } $restarts; } $lastFail $now; $group-spawn(run_worker(...), __DIR__ . /worker.php, $taskQueue); } });现在给 worker 添加类似真实任务的东西。假设有一个需要解析并计算统计信息的大日志文件。解析文件是重度的 CPU 密集型操作非常适合进程池。统计聚合应该在主进程中发生。为最小化内存使用拆分工作主进程只读取日志文件并干净地分成块workers 处理这些块并返回结果。function feed_chunks(string $filePath, int $chunkBytes, Channel $queue): int { $fileSize filesize($filePath); $fp fopen($filePath, r); $offset 0; $chunkId 0; try { while ($offset $fileSize) { $end min($offset $chunkBytes, $fileSize); // 对齐到下一行避免把行切半 if ($end $fileSize) { fseek($fp, $end); fgets($fp); // 跳过被切分行的剩余部分 $end ftell($fp); // 现在处于干净的行边界 } $queue-send([ id $chunkId, file $filePath, offset $offset, length $end - $offset, ]); $offset $end; } } finally { fclose($fp); } return $chunkId; }只向 worker 发送偏移量和块长度worker 自己打开文件并读取所需部分。这种方式让主进程快速把大任务拆分成部分无需通过管道泵送真实数据。添加另一个 Channel 用于结果在单独的协程中处理。虽然可以直接从 worker 协程修改数组来达到相同效果因为处于单个进程内所有变量对任何协程都可见。但使用 Channel 看起来更Go 风格。function run_worker(int $workerId, string $script, Channel $taskQueue, Channel $resultQueue): void { $process proc_open( [PHP_BINARY, $script], [ 0 [pipe, r], 1 [pipe, w], 2 STDERR, ], $pipes ); if ($process false) { throw new RuntimeException(Worker #$workerId: proc_open failed); } echo Worker #$workerId started\n; // 这个函数只在 TrueAsync 中有效 —— 在普通 PHP 中无法给管道设置超时 stream_set_timeout($pipes[1], 30); try { foreach ($taskQueue as $task) { // 只发送轻量级描述符 —— 日志数据不通过管道传输 $encoded json_encode($task); if (!fwrite($pipes[0], $encoded . \n)) { throw new RuntimeException(Worker #$workerId: pipe broken on chunk #{$task[id]}); } $line fgets($pipes[1]); if ($line false stream_get_meta_data($pipes[1])[timed_out]) { throw new RuntimeException(Worker #$workerId: timeout on chunk #{$task[id]}); } elseif ($line false) { throw new RuntimeException(Worker #$workerId: process died on chunk #{$task[id]}); } $result json_decode(trim($line), true); if ($result null) { throw new RuntimeException(Worker #$workerId: invalid JSON on chunk #{$task[id]}); } printf( [Worker #%d | pid %5d] chunk #%03d (%s KB) → %d lines parsed\n, $workerId, $result[pid], $task[id], number_format($task[length] / 1024), $result[parsed], ); $resultQueue-send($result); } } finally { fclose($pipes[0]); fclose($pipes[1]); proc_close($process); } }再添加一个协程在结果到达时显示进度function run_collector(Channel $resultQueue): array { $totals [ ip_counts [], status_counts [], path_counts [], method_counts [], total_bytes 0, bot_requests 0, ]; $totalParsed 0; $totalErrors 0; try { foreach ($resultQueue as $result) { $totalParsed $result[parsed]; $totalErrors $result[errors]; merge_stats($totals, $result[stats]); } } catch (Async\ChannelException) { } return [$totals, $totalParsed, $totalErrors]; }最后把所有东西连接起来$taskQueue new Channel(POOL_SIZE); $resultQueue new Channel(POOL_SIZE); $group new TaskSet(); $workerId 0; try { // 启动收集器协程 —— 合并到达的结果 $collector spawn(run_collector(...), $resultQueue); // 启动 worker 池 for ($i 1; $i POOL_SIZE; $i) { $group-spawn(run_worker(...), $workerId, __DIR__ . /log_worker.php, $taskQueue, $resultQueue); } echo Start\n; $startTime microtime(true); // 分割文件直接把块描述符投喂到 Channel。 // send() 在所有 worker 忙碌时阻塞 → 自然的背压。 // joinAll() 作为取消令牌如果所有 worker 死亡停止投喂。 $totalChunks feed_chunks($logFile, CHUNK_BYTES, $taskQueue, $group); $taskQueue-close(); printf(\n Split into %d chunks of ~%d MB each\n, $totalChunks, CHUNK_BYTES / 1024 / 1024); try { $group-joinAll()-await(); } catch (\Exception $e) { echo Exception: {$e-getMessage()}\n; } } finally { $taskQueue-close(); $supervisor-cancel(); $group-seal(); $group-joinAll()-await(); $resultQueue-close(); // 从收集器获取合并后的结果 [$stats, $totalParsed, $totalErrors] \Async\await($collector); $elapsed microtime(true) - $startTime; print_report($stats, $totalParsed, $totalErrors, $elapsed); }
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2486772.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!