《用C#实现工业现场数据的实时采集与存储》的完整、工业级、可落地的实现方案
以下是针对《用C#实现工业现场数据的实时采集与存储》的完整、工业级、可落地的实现方案。内容基于 .NET 8 / .NET 92025–2026 年主流工业实践重点解决高频采集、断线重连、批量写入、数据丢失最小化等问题。1. 工业现场数据采集架构推荐分层数据源层 ↓ (Modbus TCP / RTU / S7 / OPC UA / 串口 / MQTT ...) 采集层BackgroundService 多协议驱动 ↓ (Channel 或 ConcurrentQueue 缓冲) 缓存层内存队列 批量打包 ↓ (每 1–5 秒或达到容量阈值触发写入) 存储层时序数据库优先InfluxDB v2 / TimescaleDB / DolphinDB ↳ 可选SQLite 本地缓存断网场景 → 后续同步 监控层Serilog Prometheus / App.Metrics核心原则采集与存储解耦采集永不阻塞批量写入减少数据库压力断线自动重连 心跳检测异常数据不丢失内存 本地文件双保险2. 设备接口及数据读取多种协议示例2.1 Modbus TCP最常见使用FluentModbus.NET 8 原生 Span 支持性能最佳usingFluentModbus;usingSystem.Net.Sockets;// 推荐封装类publicclassModbusTcpCollector:IAsyncDisposable{privatereadonlystring_ip;privatereadonlyint_port;privateModbusTcpClient?_client;privatebool_isConnected;publicModbusTcpCollector(stringip,intport502){_ipip;_portport;}publicasyncTaskboolConnectAsync(){try{vartcpClientnewTcpClient();awaittcpClient.ConnectAsync(_ip,_port);_clientnewModbusTcpClient(tcpClient);_isConnectedtrue;returntrue;}catch{_isConnectedfalse;returnfalse;}}publicasyncTaskMemoryushortReadHoldingRegistersAsync(byteunitId,ushortstart,ushortcount){if(!_isConnected||_clientisnull)thrownewInvalidOperationException(Not connected);try{returnawait_client.ReadHoldingRegistersAsync(unitId,start,count);}catch{_isConnectedfalse;throw;}}publicasyncValueTaskDisposeAsync(){_client?.Dispose();_clientnull;}}2.2 串口 Modbus RTU高吞吐版推荐使用RJCP.IO.PortsFluentModbusLinux/Windows 通用usingRJCP.IO.Ports;usingFluentModbus;// 类似上面只是 Client 换成 ModbusRtuClientpublicclassModbusRtuCollector:IAsyncDisposable{privatereadonlystring_portName;privateSerialPortStream?_port;privateModbusRtuClient?_client;publicasyncTaskConnectAsync(){_portnewSerialPortStream(_portName,19200,8,Parity.None,StopBits.One);await_port.OpenAsync();_clientnewModbusRtuClient(_port);}// Read 方法同上}2.3 西门子 S7S7-1200/1500/200 SMART使用S7.NetPlus开源、稳定usingS7.Net;publicclassS7Collector{privatePlc?_plc;publicasyncTaskConnectAsync(stringip,CpuTypecpuTypeCpuType.S71200,shortrack0,shortslot1){_plcnewPlc(cpuType,ip,rack,slot);await_plc.OpenAsync();}publicasyncTaskobjectReadAsync(DataTypedataType,intdb,intstart,VarTypevarType,intcount1){returnawait_plc.ReadAsync(dataType,db,start,varType,count);}}3. 采集调度 缓存BackgroundService ChannelpublicclassIndustrialDataCollector:BackgroundService{privatereadonlyModbusTcpCollector_modbus;privatereadonlyChannelCollectedDataPoint_channel;privatereadonlyILogger_logger;publicIndustrialDataCollector(ModbusTcpCollectormodbus,ILoggerFactoryloggerFactory){_modbusmodbus;_channelChannel.CreateBoundedCollectedDataPoint(newBoundedChannelOptions(5000){FullModeBoundedChannelFullMode.DropOldest});_loggerloggerFactory.CreateLoggerIndustrialDataCollector();}protectedoverrideasyncTaskExecuteAsync(CancellationTokenstoppingToken){// 重连循环while(!stoppingToken.IsCancellationRequested){if(!awaitTryEnsureConnected(stoppingToken)){awaitTask.Delay(5000,stoppingToken);continue;}try{vartimernewPeriodicTimer(TimeSpan.FromMilliseconds(500));// 采集周期可配置while(awaittimer.WaitForNextTickAsync(stoppingToken)){vardataawaitCollectDataAsync();await_channel.Writer.WriteAsync(data,stoppingToken);}}catch(Exceptionex){_logger.LogError(ex,采集异常);awaitTask.Delay(3000,stoppingToken);}}}privateasyncTaskCollectedDataPointCollectDataAsync(){varregistersawait_modbus.ReadHoldingRegistersAsync(1,100,20);vartimestampDateTimeOffset.UtcNow;returnnewCollectedDataPoint{Timestamptimestamp,TagsnewDictionarystring,object{[Machine1_Temp]ModbusUtility.ConvertToFloat(registers.Span.Slice(0,2)),[Machine1_Pressure]registers.Span[4],// ... 更多点}};}privateasyncTaskboolTryEnsureConnected(CancellationTokenct){if(await_modbus.ConnectAsync())returntrue;_logger.LogWarning(Modbus 连接失败重试中...);returnfalse;}}CollectedDataPoint 示例结构化便于存储publicrecordCollectedDataPoint{publicDateTimeOffsetTimestamp{get;init;}publicDictionarystring,objectTags{get;init;}new();publicDictionarystring,objectFields{get;init;}new();}4. 高效批量存储推荐 InfluxDB v2 或 TimescaleDB4.1 使用 InfluxDB.Client官方 .NET 客户端NuGetInfluxDB.ClientpublicclassInfluxStorageService{privatereadonlyInfluxDBClient_client;privatereadonlystring_bucket;privatereadonlystring_org;publicInfluxStorageService(stringurl,stringtoken,stringorg,stringbucket){_clientnewInfluxDBClient(url,token);_orgorg;_bucketbucket;}publicasyncTaskWriteBatchAsync(IReadOnlyListCollectedDataPointpoints){varwriteApi_client.GetWriteApi();foreach(varpointinpoints){varwpPointData.Measurement(production_line).Tag(line_id,L001).Field(temp,point.Tags.GetValueOrDefault(Machine1_Temp,0d)).Field(pressure,point.Tags.GetValueOrDefault(Machine1_Pressure,0)).Timestamp(point.Timestamp,WritePrecision.Ns);writeApi.WritePoint(wp,_bucket,_org);}awaitwriteApi.FlushAsync();}}批量消费 Channel 并写入publicclassDataStorageWorker:BackgroundService{privatereadonlyChannelReaderCollectedDataPoint_reader;privatereadonlyInfluxStorageService_storage;privatereadonlyListCollectedDataPoint_batchnew(500);publicDataStorageWorker(ChannelCollectedDataPointchannel,InfluxStorageServicestorage){_readerchannel.Reader;_storagestorage;}protectedoverrideasyncTaskExecuteAsync(CancellationTokenstoppingToken){vartimernewPeriodicTimer(TimeSpan.FromSeconds(3));// 每 3 秒或 500 条写一次while(!stoppingToken.IsCancellationRequested){while(await_reader.WaitToReadAsync(stoppingToken)){if(_reader.TryRead(outvarpoint)){_batch.Add(point);if(_batch.Count500)awaitFlushBatch();}}if(awaittimer.WaitForNextTickAsync(stoppingToken)){awaitFlushBatch();}}}privateasyncTaskFlushBatch(){if(_batch.Count0)return;try{await_storage.WriteBatchAsync(_batch);_batch.Clear();}catch(Exceptionex){// 可记录到本地文件稍后重试}}}5. 注册与启动Program.csbuilder.Services.AddSingletonModbusTcpCollector(spnewModbusTcpCollector(192.168.1.100));builder.Services.AddSingletonInfluxStorageService(spnewInfluxStorageService(http://influxdb:8086,your-token,my-org,industrial));builder.Services.AddHostedServiceIndustrialDataCollector();builder.Services.AddHostedServiceDataStorageWorker();6. 工业级优化建议2025–2026 必备采集周期动态调整低负载 1s高负载 200ms心跳寄存器检测每 10s 读一次固定地址本地缓存兜底写入失败 → 写入 SQLite 或本地文件 → 定时重试指标监控采集延迟、丢包率、写入 QPS、连接状态 → Prometheus GrafanaNative AOT 发布启动快、内存低、GC 压力小如果你想继续扩展例如添加 OPC UA、MQTT、S7 协议、数据压缩、异常告警、WPF 实时曲线展示等告诉我你的具体设备类型或下一个重点模块我可以继续给出更细化的代码和架构。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2433115.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!