核心概念
-
IObservable<T>
-
表示 可观察的数据源(如事件流、实时数据)。
-
关键方法:
Subscribe(IObserver<T> observer)
,用于注册观察者。
-
-
IObserver<T>
-
表示 数据的接收者,响应数据变化。
-
三个核心方法:
-
OnNext(T value)
:接收新数据。 -
OnError(Exception error)
:处理错误。 -
OnCompleted()
:响应数据流结束。
-
-
观察者实现:
public class 观察者 : IObserver<float>
{
public void OnNext(float price)
{
Console.WriteLine($"最新股价: {price}$");
}
public void OnError(Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
public void OnCompleted()
{
Console.WriteLine("股票数据流结束");
}
}
被观察者实现:
public class 被观察者 : IObservable<float>
{
private List<IObserver<float>> _observers = new List<IObserver<float>>();
public IDisposable Subscribe(IObserver<float> observer)
{
_observers.Add(observer);
return new Unsubscriber(_observers, observer);
}
//这段代码定义了一个 Unsubscriber 内部类,用于管理观察者(IObserver<T>)的订阅和取消订阅逻辑。
//它的核心作用是 安全地从观察者列表中移除某个观察者,避免内存泄漏或无效通知。
private class Unsubscriber : IDisposable
{
private List<IObserver<float>> _observers;
private IObserver<float> _observer;
public Unsubscriber(List<IObserver<float>> observers, IObserver<float> observer)
{
_observers = observers;
_observer = observer;
}
public void Dispose()
{
if (_observer != null && _observers.Contains(_observer))
_observers.Remove(_observer);
}
}
// 模拟股价变动
public void UpdatePrice(float price)
{
foreach (var observer in _observers)
{
observer.OnNext(price);
}
}
public void MarketClosed()
{
foreach (var observer in _observers)
{
observer.OnCompleted();
}
}
}
WInform调用案例:
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
观察者 A = new 观察者();
被观察者 B = new 被观察者();
IDisposable A回执;
private void btn_观察_Click(object sender, EventArgs e)
{
A回执 = B.Subscribe(A);
}
private void btn_取消观察_Click(object sender, EventArgs e)
{
A回执.Dispose();
}
private void btn_价格一_Click(object sender, EventArgs e)
{
B.UpdatePrice(1);
}
private void btn_价格二_Click(object sender, EventArgs e)
{
B.UpdatePrice(2);
}
private void btn_股市关闭_Click(object sender, EventArgs e)
{
B.MarketClosed();
}
}