告别阻塞!用 PHP TrueAsync 实现 PHP 脚本提速 10 倍

news2026/4/5 19:59:37
proc_open与shell_exec等函数不同proc_open是创建进程的丰富工具。PHP 核心甚至为它引入了特殊的hack来正确处理管道。管道是进程间通信的最佳方式之一也是最便捷的方式。唯一更好的方案是共享内存加文件事件这仅仅是因为内存区域位于操作系统内核之外。为什么管道方便因为它们假装是文件。可以像普通文件一样用fread读取、fwrite写入。这对开发者来说非常有价值因为代码变得更通用、更可移植。通信协议管道是字节流它不知道消息的概念只是简单地在父子进程间传输字节。为了让代码正常工作需要一种方式来界定消息边界。最简单的格式是 NDJSON每条消息占一行行与行之间用\n分隔。{id:1,value:42} {id:2,value:17} {id:3,value:88}为什么这种格式很合适因为 PHP 对 JSON 支持很好而且 PHP 有fgets函数可以从流中读取行。$line fgets(STDIN); // 读取到 \n 为止 $task json_decode($line); // 解析 JSON下面编写普通同步代码打开进程并通过 NDJSON 协议与之通信function run_worker(string $script, array $tasks): void { $process proc_open( [PHP_BINARY, $script], [ 0 [pipe, r], // 子进程 STDIN 1 [pipe, w], // 子进程 STDOUT 2 STDERR, ], $pipes ); foreach ($tasks as $task) { // 发送任务 fwrite($pipes[0], json_encode($task) . \n); // 等待响应 ← PHP 在这里阻塞直到读取到一行 $line fgets($pipes[1]); $result json_decode(trim($line), true); echo Task #{$result[task_id]}: {$result[result]}\n; } fclose($pipes[0]); // 关闭 STDIN → worker.php 将退出 fclose($pipes[1]); proc_close($process); } run_worker(worker.php, [ [id 1, value 42], [id 2, value 17], [id 3, value 88], ]);现在把这段代码放到协程里不是放一个而是放 10 个。$tasks [ [id 1, value 42], [id 2, value 17], // ... 还有 28 个任务 ]; for ($i 0; $i 10; $i) { spawn(run_worker(...), worker.php, $tasks); }只写了一行异步代码spawn(run_worker(...))但实际上proc_open、fgets和fclose从一开始就变成了非阻塞 I/O 操作。现在十个协程实际上是 11 个因为有主代码竞争 CPU 时间。大部分代码仍保持顺序执行。Channels目前每个协程都收到相同的任务数组这显然不是期望的行为。例如可能想从 Twig 模板生成静态站点的 HTML给定文件列表后让每个文件由独立的 worker 处理或者可能需要解析日志。Workers 相互独立运行因为工作量可能不同而且不知道 worker 是否忙碌。为此编写任务分发层相当复杂也不理想。需要一个快速、便捷的解决方案无需过多思考。为此TrueAsync 提供了 Channel正是为这类任务设计的。Channel 是一个队列用于在协程间交换数据。Channel 中的每条消息可以是一个任务。协程从同一个 Channel 读取并执行任务。无需检查 worker 是否忙碌只需把任务发到 Channelworker 准备好时就会执行。$taskQueue new Async\Channel(10); for ($i 0; $i 10; $i) { spawn(run_worker(...), worker.php, $taskQueue); } foreach ($tasks as $task) { $channel-send($task); } $channel-close();当然这段代码没有错误处理或边界检查但它尽可能简单且易读。然而在底层一些非平凡的事情正在发生。假设有 100 个任务和 10 个进程的池。当所有 worker 都忙碌时会发生什么$channel-send($task)会挂起协程直到队列有空间。队列大小与池大小匹配并非巧合任务供给函数会暂停防止内存被尚无法处理的任务填满。TaskGroup已编写的代码离生产就绪还很远。不仅跳过了错误处理而且没有控制协程。它们悬在空中如果应用开始关闭或出现问题异常无法保证协程不会因逻辑错误而挂起。这很糟糕而且在生产环境中代码挂起却无从排查原因非常令人不快。为最小化意外错误的风险使用 TaskGroup 模式。它专门设计用于一起启动协程、一起等待它们、一起销毁它们。$taskQueue new Async\Channel(10); $group new Async\TaskGroup(); try { // 在组管理下启动 workers for ($i 0; $i 10; $i) { $group-spawn(run_worker(...), worker.php, $taskQueue); } // 向 Channel 投喂任务 foreach ($tasks as $task) { $taskQueue-send($task); } } finally { // 关闭任务 Channel —— 尝试读取的协程 // 会收到 ChannelClosedException 并优雅关闭 $taskQueue-close(); // 封印组防止意外添加新协程 $group-seal(); // 防止添加新协程 // 等待尚未完成的任务 $group-all()-await(); // 等待所有 worker 完成 }增强代码的容错性前一个版本的代码不适合真实场景。如果 worker 进程崩溃fwrite($pipes[0], json_encode($task) . \n)可能失败。而且代码还隐藏着一个隐患。function run_worker(string $script, Channel $taskQueue): void { return; } $taskQueue new Channel(POOL_SIZE); $group new TaskGroup(); try { for ($i 0; $i POOL_SIZE; $i) { $group-spawn(run_worker(...), __DIR__ . /worker.php, $taskQueue); } foreach (generate_tasks(TASK_COUNT) as $task) { $taskQueue-send($task); } } finally { $taskQueue-close(); $group-seal(); $group-all()-await(); echo \nDone.\n; }结果会是死锁错误输出类似这样 DEADLOCK REPORT START Coroutines waiting: 1, active_events: 0 Coroutine 4 spawned at main:0, suspended at main.php:96 (main) waiting for: - Channel(capacity3, receivers0, senders1) DEADLOCK REPORT END 死锁调试输出可通过async.debug_deadlockini 指令控制。详见 https://true-async.github.io/en/docs/reference/ini-settings.html原因是主协程试图向已满的 Channel 发送任务但没有人会去读取。为避免此错误需要在逻辑上将 TaskGroup 的生命周期与$taskQueue绑定。这可以通过TaskGroup::all()-await()等待所有活跃任务来实现。spawn(function () use ($taskQueue, $group) { try { $group-all()-await(); } finally { $taskQueue-close(); } });更简洁的实现方式foreach (generate_tasks(TASK_COUNT) as $task) { $taskQueue-send($task, $group-all()); }Channel::send方法的第二个参数是取消令牌允许在任意条件下取消操作。$group-all()返回一个 Future当所有任务完成时解析。不幸的是即使$taskQueue-send($task, $group-all())也不是 100% 正确因为它没有处理 TaskGroup 中所有协程已完成但新协程尚未启动的情况。Channel 类的语义——在 TrueAsync 0.6.0 中与 Go 非常相似——是最有问题的实现之一值得单独关注。现在改进进程池代码本身的错误处理。foreach ($taskQueue as $task) { $encoded json_encode($task); if ($encoded false) { echo [worker] json_encode failed for task #{$task[id]}: . json_last_error_msg() . \n; return; } if (fwrite($pipes[0], $encoded . \n)) { echo [worker] fwrite failed for task #{$task[id]}: pipe may be broken\n; return; } $line fgets($pipes[1]); if ($line false stream_get_meta_data($pipes[1])[timed_out]) { echo [worker] timeout waiting for response on task #{$task[id]}\n; return; } else if ($line false) { echo [worker] fgets failed for task #{$task[id]}: pipe closed or EOF\n; return; } $result json_decode(trim($line), true); if ($result null) { echo [worker] json_decode failed for task #{$task[id]}: . json_last_error_msg() . (raw: . trim($line) . )\n; return; } }在循环前添加超时设置stream_set_timeout($pipes[1], 5)——五秒——防止父进程失控。故意在 worker_fail.php 中破坏 worker 代码例如添加sleep(10)看看会发生什么php.exe E:\php\examples\workers_process_cli\main2.php [worker] fwrite failed for task #1: pipe may be broken [worker] fwrite failed for task #2: pipe may be broken [worker] fwrite failed for task #3: pipe may be broken Done.现在可以确信即使 worker 崩溃主进程也不会挂起而是继续运行。如果进程池能自动启动新进程以保持总数恒定那就更好了。怎么做可以复杂化协程代码添加显式重启进程的循环。把这个练习留给喜欢复杂代码和分形的爱好者这不是我们的选择因为很容易不小心破坏什么。协程内部的线性、简单和最小分支是它的主要优点。让我们保持这样。既然协程在概念上等同于进程那么在进程交互结束时同时结束协程是合理的。这意味着可以响应协程完成事件。TaskGroup 类有合适的race()方法在协程完成时触发。唯一的问题是 TaskGroup 是幂等的多次调用race()总是返回相同结果。所以需要不同的工具。TaskSet 类非常适合这个需求——它是 TaskGroup 的孪生兄弟非常适合 supervisor 逻辑。大多数方法相同区别在于join*方法组它不仅返回任务结果同时从 TaskSet 中移除它。正是需要的$taskQueue new Channel(POOL_SIZE); $group new TaskSet(); try { for ($i 0; $i POOL_SIZE; $i) { $group-spawn(run_worker(...), __DIR__ . /worker.php, $taskQueue); } foreach (generate_tasks(TASK_COUNT) as $task) { $taskQueue-send($task, $group-joinAll()); } } finally { $taskQueue-close(); $group-seal(); $group-joinAll()-await(); echo \nDone.\n; }现在添加 supervisor 逻辑在进程崩溃时重启$supervisor spawn(function () use ($group, $taskQueue): void { while (true) { try { // 等待至少一个任务完成 $group-joinNext()-await(); } catch (\Exception $exception) { // 发生了真正意想不到的事 echo [supervisor] Exception: {$exception-getMessage()}\n; $group-seal(); $taskQueue-close(); return; } // 如果组被封印或 Channel 已关闭退出循环 —— // 进程被有意停止不应重启 if($group-isSealed() || $taskQueue-isClosed()) { return; } $group-spawn(run_worker(...), __DIR__ . /worker.php, $taskQueue); } });用 worker_fail.php 运行代码观察到尽管 worker 崩溃进程池仍在运行php.exe E:\php\examples\workers_process_cli\main2.php [worker] fwrite failed for task #1: pipe may be broken [worker] fwrite failed for task #2: pipe may be broken条目数应该等于 TASK_COUNT尽管没有任务被成功执行。太好了。supervisor 代码对任务如何执行一无所知。执行任务的协程对 supervisor 一无所知。这不仅产生了相当可读的代码而且产生了低耦合的代码易于维护。稍微改进 supervisor让它对重启不要过于宽容如果有无限多的任务且 worker.php 有 bug可能进入无限重启循环。$supervisor spawn(function () use ($group, $taskQueue): void { $cooldown 5; $threshold 2; $restarts 0; $lastFail time(); while (true) { try { $group-joinNext()-await(); } catch (\Exception $e) { echo [supervisor] Exception: {$e-getMessage()}\n; $group-seal(); $taskQueue-close(); return; } if ($group-isSealed() || $taskQueue-isClosed()) { return; } $now time(); if (($now - $lastFail) $cooldown) { if ($restarts $threshold) { echo [supervisor] Too many worker failures, shutting down pool.\n; $group-seal(); $taskQueue-close(); return; } $restarts; } $lastFail $now; $group-spawn(run_worker(...), __DIR__ . /worker.php, $taskQueue); } });现在给 worker 添加类似真实任务的东西。假设有一个需要解析并计算统计信息的大日志文件。解析文件是重度的 CPU 密集型操作非常适合进程池。统计聚合应该在主进程中发生。为最小化内存使用拆分工作主进程只读取日志文件并干净地分成块workers 处理这些块并返回结果。function feed_chunks(string $filePath, int $chunkBytes, Channel $queue): int { $fileSize filesize($filePath); $fp fopen($filePath, r); $offset 0; $chunkId 0; try { while ($offset $fileSize) { $end min($offset $chunkBytes, $fileSize); // 对齐到下一行避免把行切半 if ($end $fileSize) { fseek($fp, $end); fgets($fp); // 跳过被切分行的剩余部分 $end ftell($fp); // 现在处于干净的行边界 } $queue-send([ id $chunkId, file $filePath, offset $offset, length $end - $offset, ]); $offset $end; } } finally { fclose($fp); } return $chunkId; }只向 worker 发送偏移量和块长度worker 自己打开文件并读取所需部分。这种方式让主进程快速把大任务拆分成部分无需通过管道泵送真实数据。添加另一个 Channel 用于结果在单独的协程中处理。虽然可以直接从 worker 协程修改数组来达到相同效果因为处于单个进程内所有变量对任何协程都可见。但使用 Channel 看起来更Go 风格。function run_worker(int $workerId, string $script, Channel $taskQueue, Channel $resultQueue): void { $process proc_open( [PHP_BINARY, $script], [ 0 [pipe, r], 1 [pipe, w], 2 STDERR, ], $pipes ); if ($process false) { throw new RuntimeException(Worker #$workerId: proc_open failed); } echo Worker #$workerId started\n; // 这个函数只在 TrueAsync 中有效 —— 在普通 PHP 中无法给管道设置超时 stream_set_timeout($pipes[1], 30); try { foreach ($taskQueue as $task) { // 只发送轻量级描述符 —— 日志数据不通过管道传输 $encoded json_encode($task); if (!fwrite($pipes[0], $encoded . \n)) { throw new RuntimeException(Worker #$workerId: pipe broken on chunk #{$task[id]}); } $line fgets($pipes[1]); if ($line false stream_get_meta_data($pipes[1])[timed_out]) { throw new RuntimeException(Worker #$workerId: timeout on chunk #{$task[id]}); } elseif ($line false) { throw new RuntimeException(Worker #$workerId: process died on chunk #{$task[id]}); } $result json_decode(trim($line), true); if ($result null) { throw new RuntimeException(Worker #$workerId: invalid JSON on chunk #{$task[id]}); } printf( [Worker #%d | pid %5d] chunk #%03d (%s KB) → %d lines parsed\n, $workerId, $result[pid], $task[id], number_format($task[length] / 1024), $result[parsed], ); $resultQueue-send($result); } } finally { fclose($pipes[0]); fclose($pipes[1]); proc_close($process); } }再添加一个协程在结果到达时显示进度function run_collector(Channel $resultQueue): array { $totals [ ip_counts [], status_counts [], path_counts [], method_counts [], total_bytes 0, bot_requests 0, ]; $totalParsed 0; $totalErrors 0; try { foreach ($resultQueue as $result) { $totalParsed $result[parsed]; $totalErrors $result[errors]; merge_stats($totals, $result[stats]); } } catch (Async\ChannelException) { } return [$totals, $totalParsed, $totalErrors]; }最后把所有东西连接起来$taskQueue new Channel(POOL_SIZE); $resultQueue new Channel(POOL_SIZE); $group new TaskSet(); $workerId 0; try { // 启动收集器协程 —— 合并到达的结果 $collector spawn(run_collector(...), $resultQueue); // 启动 worker 池 for ($i 1; $i POOL_SIZE; $i) { $group-spawn(run_worker(...), $workerId, __DIR__ . /log_worker.php, $taskQueue, $resultQueue); } echo Start\n; $startTime microtime(true); // 分割文件直接把块描述符投喂到 Channel。 // send() 在所有 worker 忙碌时阻塞 → 自然的背压。 // joinAll() 作为取消令牌如果所有 worker 死亡停止投喂。 $totalChunks feed_chunks($logFile, CHUNK_BYTES, $taskQueue, $group); $taskQueue-close(); printf(\n Split into %d chunks of ~%d MB each\n, $totalChunks, CHUNK_BYTES / 1024 / 1024); try { $group-joinAll()-await(); } catch (\Exception $e) { echo Exception: {$e-getMessage()}\n; } } finally { $taskQueue-close(); $supervisor-cancel(); $group-seal(); $group-joinAll()-await(); $resultQueue-close(); // 从收集器获取合并后的结果 [$stats, $totalParsed, $totalErrors] \Async\await($collector); $elapsed microtime(true) - $startTime; print_report($stats, $totalParsed, $totalErrors, $elapsed); }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2486772.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…