Rx.NET的使用
分类:
IT文章
•
2025-01-16 08:20:19
Rx的核心接口


相关链接
教程:https://www.cnblogs.com/sheng-jie/p/10399049.html
https://rehansaeed.com/reactive-extensions-part2-wrapping-events/
代码:http://rxwiki.wikidot.com/101samples
官网:http://reactivex.io/
用一个例子认识这个2个接口
实际中不可这样使用
class Program
{
static void Main(string[] args)
{
Test();
}
private static void Test()
{
var numbers = new MySequenceOfNumbers();
var observer = new MyConsoleObserver<int>();
numbers.Subscribe(observer);
Console.ReadLine();
}
}
/// <summary>
/// 自定义被观察队列
/// </summary>
public class MySequenceOfNumbers : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
return Disposable.Empty;
}
}
/// <summary>
/// 自定义观察者对象
/// </summary>
/// <typeparam name="T"></typeparam>
public class MyConsoleObserver<T> : IObserver<T>
{
public void OnNext(T value)
{
Console.WriteLine("接收到 value {0}", value);
}
public void OnError(Exception error)
{
Console.WriteLine("出现异常! {0}", error);
}
public void OnCompleted()
{
Console.WriteLine("关闭观察行为");
}
}
View Code
认识Subject类,同时实现了上面2个接口

入门例子:取代event
先看event的例子
class Heater
{
private delegate void TemperatureChanged(int temperature);
private event TemperatureChanged TemperatureChangedEvent;
public void BoilWater()
{
TemperatureChangedEvent += ShowTemperature;
TemperatureChangedEvent += MakeAlerm;
Task.Run(
() =>
Enumerable.Range(1, 100).ToList().ForEach((temperature) => TemperatureChangedEvent(temperature))
);
}
private void ShowTemperature(int temperature)
{
Console.WriteLine($"当前温度:{temperature}");
}
private void MakeAlerm(int temperature)
{
Console.WriteLine($"嘟嘟嘟,当前水温{temperature}");
}
}
class Program
{
static void Main(string[] args)
{
Heater heater = new Heater();
heater.BoilWater();
}
}
View Code
实现同样效果可以使用如下Rx代码
var observable = Enumerable.Range(1, 100).ToObservable(NewTheadScheduler.Default);//申明可观察序列
Subject<int> subject = new Subject<int>();//申明Subject
subject.Subscribe((temperature) => Console.WriteLine($"当前温度:{temperature}"));//订阅subject
subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,当前水温:{temperature}"));//订阅subject
observable.Subscribe(subject);//订阅observable
Observable的创建方式
Observable.Create<int>(observer =>
{
for (int i = 0; i < 5; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
return Disposable.Empty;
}).Subscribe(o => Console.WriteLine("Here is the data " + o));
创建可以取消的Observable
public static void TestCreate()
{
IObservable<int> ob =
Observable.Create<int>(o =>
{
var cancel = new CancellationDisposable(); // internally creates a new CancellationTokenSource
NewThreadScheduler.Default.Schedule
(
() =>
{
int i = 0;
for (; ; )
{
Thread.Sleep(200); // here we do the long lasting background operation
if (!cancel.Token.IsCancellationRequested) // check cancel token periodically
o.OnNext(i++);
else
{
Console.WriteLine("Aborting because cancel event was signaled!");
o.OnCompleted(); // will not make it to the subscriber
return;
}
}
}
);
return cancel;
}
);
IDisposable subscription = ob.Subscribe(i => Console.WriteLine(i));
Console.WriteLine("Press any key to cancel");
Console.ReadKey();
subscription.Dispose();
Console.WriteLine("Press any key to quit");
Console.ReadKey(); // give background thread chance to write the cancel acknowledge message
}
private static void TestRange()
{
IObservable<int> observable = Observable.Range(0, 10).Select(i => i * 2);
observable.Subscribe(o => Console.WriteLine("Data is " + o));
}
private static void TestGenerate()
{
var ob=Observable.Generate(0,
i => i < 10,
i => i + 1,
i=>i*2);
ob.Subscribe(o => Console.WriteLine("Data is " + o));
}
启动后台工作
public static void StartBackgroundWork()
{
Console.WriteLine("Shows use of Start to start on a background thread:");
var o = Observable.Start(() =>
{
//This starts on a background thread.
Console.WriteLine("From background thread. Does not block main thread.");
PrintThreadID();
Console.WriteLine("Calculating...");
Thread.Sleep(3000);
Console.WriteLine("Background work completed.");
}).Finally(() =>
{
Console.WriteLine("Main thread completed.");
PrintThreadID();
});
Console.WriteLine("
In Main Thread...
");
PrintThreadID();
o.Wait(); // Wait for completion of background operation.
}
View Code
封装事件
封装Timer 的Elapsed时间
var timer = new Timer(interval: 1000) { Enabled = true };
var ticks = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>
(
hanlder => (s, a) => hanlder(s, a),
handler => timer.Elapsed += handler,
handler => timer.Elapsed -= handler
);
ticks.Subscribe(data => Console.WriteLine("On Next: " + data.EventArgs.SignalTime));
还可以写成这样
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
var ticks = Observable.FromEventPattern(timer, "Elapsed");
ticks.Subscribe(data => Console.WriteLine("OnNext: "
+ ((ElapsedEventArgs)data.EventArgs).SignalTime));
public static event EventHandler SimpleEvent;
private static void TestFromEvent()
{
var eventAsObservable = Observable.FromEventPattern(//
ev=> SimpleEvent+=ev,
ev=>SimpleEvent-=ev
);
// SimpleEvent is null until we subscribe
Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
Console.WriteLine("Subscribe");
//Create two event subscribers
var s = eventAsObservable.Subscribe(args => Console.WriteLine("Received event for s subscriber"));
var t = eventAsObservable.Subscribe(args => Console.WriteLine("Received event for t subscriber"));
// After subscribing the event handler has been added
Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
Console.WriteLine("Raise event");
if (null != SimpleEvent)
{
SimpleEvent(null, EventArgs.Empty);
}
// Allow some time before unsubscribing or event may not happen
Thread.Sleep(100);
Console.WriteLine("Unsubscribe");
s.Dispose();
t.Dispose();
// After unsubscribing the event handler has been removed
Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
}
封装Button 的Click事件
private void init()
{
var eventAsObservable = Observable.FromEventPattern(
ev=>button1.Click+=ev,
ev=>button1.Click-=ev
);
eventAsObservable.Subscribe(o => MessageBox.Show("You clicked " + o.Sender.ToString()));
}
基于时间、序列等创建Observable
private static void TestWhere()
{
//where
IObservable<long> oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var lowNums=oneNumberPerSecond.Where(n => n < 5).Select(n => n);
lowNums.Subscribe(num => Console.WriteLine("This is " + num));
}
private static void TestToObAndInterval()
{
IEnumerable<int> someInts = new List<int> { 1, 2, 3, 4, 5 };
// To convert a generic IEnumerable into an IObservable, use the ToObservable extension method.
IObservable<int> observable = someInts.ToObservable();
// 0 after 1s, 1 after 2s, 2 after 3s, etc.
IObservable<long> oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
}
调度
对于基于Inerval的消息,通知会在线程池线程中引发
private void button1_Click(object sender, EventArgs e)
{
var uiContext = SynchronizationContext.Current;
Observable.Interval(TimeSpan.FromSeconds(1))
.ObserveOn(uiContext)
.Subscribe(x => this.Text=("Interval " + x + " on thread " +
Environment.CurrentManagedThreadId));
}
NotifyCollectionChangedEventHandler的原本使用以及基于Rx的封装
原本的使用方式
private static void TestNormalNotifyCollectionChanged()
{
ObservableCollection<string> names = new ObservableCollection<string>();
names.CollectionChanged += (object sender, NotifyCollectionChangedEventArgs e) =>
{
Console.WriteLine("Change type: " + e.Action);
if (e.NewItems != null)
{
Console.WriteLine("Items added: ");
foreach (var item in e.NewItems)
{
Console.WriteLine(item);
}
}
if (e.OldItems != null)
{
Console.WriteLine("Items removed: ");
foreach (var item in e.OldItems)
{
Console.WriteLine(item);
}
}
};
names.Add("Adam");
names.Add("Eve");
names.Remove("Adam");
names.Add("John");
names.Add("Peter");
names.Clear();
}
private static void TestNotifyCollectionChanged()
{
var customers = new ObservableCollection<Customer>();
var customerChanges = Observable.FromEventPattern(
(EventHandler<NotifyCollectionChangedEventArgs> ev)
=> new NotifyCollectionChangedEventHandler(ev),
ev => customers.CollectionChanged += ev,
ev => customers.CollectionChanged -= ev);
var watchForNewCustomersFromWashington =
from c in customerChanges
where c.EventArgs.Action == NotifyCollectionChangedAction.Add
from cus in c.EventArgs.NewItems.Cast<Customer>().ToObservable()
where cus.Region == "WA"
select cus;
Console.WriteLine("New customers from Washington and their orders:");
watchForNewCustomersFromWashington.Subscribe(cus =>
{
Console.WriteLine("Customer {0}:", cus.CustomerName);
foreach (var order in cus.Orders)
{
Console.WriteLine("Order {0}: {1}", order.OrderId, order.OrderDate);
}
});
customers.Add(new Customer
{
CustomerName = "Lazy K Kountry Store",
Region = "WA",
Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 1 } }
});
Thread.Sleep(1000);
customers.Add(new Customer
{
CustomerName = "Joe's Food Shop",
Region = "NY",
Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 2 } }
});
Thread.Sleep(1000);
customers.Add(new Customer
{
CustomerName = "Trail's Head Gourmet Provisioners",
Region = "WA",
Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 3 } }
});
其中相关类
class Customer
{
public Customer() { Orders = new ObservableCollection<Order>(); }
public string CustomerName { get; set; }
public string Region { get; set; }
public ObservableCollection<Order> Orders { get; private set; }
}
class Order
{
public int OrderId { get; set; }
public DateTimeOffset OrderDate { get; set; }
}
View Code