php方案 时序对齐与水位线(Watermark)
核心问题 流处理有两个时钟 事件时间(Event Time)← 传感器/用户操作实际发生的时刻 处理时间(Processing Time)← 数据抵达处理器的时刻 传感器 t100ms 产生数据 → 网络延迟 → t350ms 才到达 水位线系统自己估算t 之前的数据应该都到齐了然后才敢关窗口触发计算。 时序对齐传感器A每100ms一条传感器B每150ms一条要做差值运算必须先插值到同一时间轴。---完整实现?php// ══════════════════════════════════════════════════════// 一、滚动窗口 水位线// ══════════════════════════════════════════════════════classTumblingWindow{privateint$wmPHP_INT_MIN;privatearray$wins[];publicfunction__construct(privateint$size,// 窗口大小 msprivateint$late,// 允许迟到 msprivate\Closure$onClose,){}publicfunctionadd(int$t,mixed$v):void{$wsintdiv($t,$this-size)*$this-size;// 该事件所属窗口的起点// 水位线之前的窗口已关迟到太久直接丢if($ws$this-size$this-wm){echo [丢弃] t{$t}ms水位线{$this-wm}ms 已过\n;return;}$this-wins[$ws][]$v;// 推进水位线当前事件时间 - 容忍迟到量$this-wmmax($this-wm,$t-$this-late);// 关闭所有结束时间 水位线的窗口foreach($this-winsas$start$_){if($start$this-size$this-wm){($this-onClose)($start,$start$this-size-1,$this-wins[$start]);unset($this-wins[$start]);}}}publicfunctionflush():void// 流结束强制关闭剩余窗口{ksort($this-wins);foreach($this-winsas$start$evts){($this-onClose)($start,$start$this-size-1,$evts);}$this-wins[];}publicfunctionwm():int{return$this-wm;}}// ══════════════════════════════════════════════════════// 二、时序对齐插值到统一时间轴// ══════════════════════════════════════════════════════classSeriesAligner{/** * 把稀疏序列插值到目标时间轴 * param int[] $axis 目标时间戳 * param array $series [[t, v], ...] * param string $method linear | ffill | nearest */publicstaticfunctionalign(array$axis,array$series,string$methodlinear):array{usort($series,fn($a,$b)$a[0]$b[0]);$out[];foreach($axisas$t){$out[$t]self::interp($t,$series,$method);}return$out;}/** 两个序列对齐后逐点作差 */publicstaticfunctiondiff(array$axis,array$sA,array$sB,string$mlinear):array{$aself::align($axis,$sA,$m);$bself::align($axis,$sB,$m);$r[];foreach($axisas$t){$r[$t]($a[$t]!null$b[$t]!null)?round($a[$t]-$b[$t],4):null;}return$r;}privatestaticfunctioninterp(int$t,array$s,string$method):?float{$lo$hinull;foreach($sas$i[$st]){if($st$t)$lo$i;elseif($hinull){$hi$i;break;}}if($lonull$hinull)returnnull;if($lonull)return$s[$hi][1];if($hinull)return$methodbfill?null:$s[$lo][1];if($s[$lo][0]$t)return$s[$lo][1];[$t0,$v0]$s[$lo];[$t1,$v1]$s[$hi];returnmatch($method){linear$v0($v1-$v0)*($t-$t0)/($t1-$t0),ffill$v0,// 上一个值填充nearest($t-$t0)($t1-$t)?$v0:$v1,// 最近邻defaultnull,};}}---Demo// ── Demo1水位线窗口10s容忍迟到3s─────────────────────────echo══ 水位线滚动窗口 ══\n;$winnewTumblingWindow(10_000,3_000,function(int$s,int$e,array$evts){printf( ✓ 窗口 [%ds-%ds] 关闭 → %d条 sum%d\n,$s/1000,$e/1000,count($evts),array_sum($evts));});foreach([[1_000,5],// t1s[3_000,8],// t3s[2_000,6],// t2s ← 迟到但在容忍内3s内[13_000,3],// t13s ← 推进水位线到10s触发[0-10s]窗口关闭[8_000,4],// t8s ← 属于[0-10s]但已关 → 丢弃[15_000,7],// t15s[14_000,2],// t14s ← 迟到但合法15s - 3s 12s 14s]as[$t,$v]){printf(到达 t%5dms v%d wm%dms\n,$t,$v,$win-wm());$win-add($t,$v);}$win-flush();// ── Demo2时序对齐 ───────────────────────────────────────────────echo\n══ 传感器对齐A100ms采样B150ms采样══\n;$sA[[0,10.0],[100,12.0],[200,11.5],[300,13.0],[400,14.0]];$sB[[0,5.0],[150,6.5],[300,7.0],[450,8.0]];$axisrange(0,400,100);$aASeriesAligner::align($axis,$sA);$aBSeriesAligner::align($axis,$sB,linear);$diffSeriesAligner::diff($axis,$sA,$sB);printf(%-6s %-6s %-10s %-6s\n,时间,A,B(插值),A-B);echostr_repeat(─,34).\n;foreach($axisas$t){printf(t%-4d %-6.2f %-10.4f %s\n,$t,$aA[$t],$aB[$t],$diff[$t]);}输出 ══ 水位线滚动窗口 ══ 到达 t1000ms v5wm-2000ms 到达 t3000ms v8wm0ms 到达 t2000ms v6wm0ms 到达 t13000ms v3wm10000ms ✓ 窗口[0s-9s]关闭 →3条 sum19到达 t8000ms v4wm10000ms[丢弃]t8000ms水位线10000ms 已过 到达 t15000ms v7wm12000ms 到达 t14000ms v2wm12000ms ✓ 窗口[10s-19s]关闭 →3条 sum12══ 传感器对齐A100ms采样B150ms采样══ 时间AB(插值)A-B────────────────────────────────── t010.005.00005.0000t10012.006.00006.0000←B在t100无数据线性插值 t20011.506.66674.8333t30013.007.00006.0000t40014.007.66676.3333---水位线推进机制 事件按乱序到达 t1t3t2t13t8(迟)t15t14|||||||↑ wm 推到10s[0-10s]窗口关闭 ↑ t8来了但窗口已关丢弃 wmmax(wm,当前事件时间-容忍迟到)窗口关闭条件窗口结束时间wm 三种插值选哪种 ┌──────────────────────┬───────────────────────────┐ │ 场景 │ 方法 │ ├──────────────────────┼───────────────────────────┤ │ 传感器温度、平滑信号 │ linear线性插值 │ ├──────────────────────┼───────────────────────────┤ │ 状态类数据开/关 │ ffill上一个值持续有效 │ ├──────────────────────┼───────────────────────────┤ │ 离散事件取最近 │ nearest │ └──────────────────────┴───────────────────────────┘
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2438129.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!