观察程序设计模式 观察程序设计模式 观察程序设计模式最佳做法 如何:实现提供程序 如何:实现观察程序

泛型类型参数表示提供通知信息的类型。

应用模式

每当提供程序使用回调向其客户端提供当前信息时,即可实现该模式。

实现该模式需要您提供以下内容:

  • 提供程序必须实现一个方法 (IObservable<T>.Subscribe),希望接收提供程序通知的观察者会调用该方法。

  • 观察者必须实现三个方法,提供程序将调用所有这些方法:

  • 不定义观察者接收通知的顺序;提供程序可以*选择任何方法来确定该顺序。

  • 观察者从 Subscribe 方法接收对 IDisposable 实现的引用,所以它们还可以调用 IDisposable.Dispose 方法以在提供程序完成通知发送之前取消订阅。

  • 尽管该对象可以与 IObservable<T> 实现相同,但在通常情况下,该对象为不同的类型。

观察程序设计模式
观察程序设计模式
观察程序设计模式最佳做法
如何:实现提供程序
如何:实现观察程序注意

例如,Reactive Extensions for .NET (Rx) 包含一组支持异步编程的扩展方法和 LINQ 标准序列运算符。

实现模式

如以下示例中所示。

using System;
using System.Collections.Generic;

public class BaggageInfo
{
   private int flightNo;
   private string origin;
   private int location;

   internal BaggageInfo(int flight, string from, int carousel)
   {
      this.flightNo = flight;
      this.origin = from;
      this.location = carousel;
   }

   public int FlightNumber {
      get { return this.flightNo; }
   }

   public string From {
      get { return this.origin; }
   }

   public int Carousel {
      get { return this.location; }
   }
}

在内部,它包含两个集合:

  • observers - 接收更新信息的客户端集合。

  • flights - 航班及其指派的转盘的集合。

BaggageHandler 类的源代码。

public class BaggageHandler : IObservable<BaggageInfo>
{
   private List<IObserver<BaggageInfo>> observers;
   private List<BaggageInfo> flights;

   public BaggageHandler()
   {
      observers = new List<IObserver<BaggageInfo>>();
      flights = new List<BaggageInfo>();
   }

   public IDisposable Subscribe(IObserver<BaggageInfo> observer)
   {
      // Check whether observer is already registered. If not, add it
      if (! observers.Contains(observer)) {
         observers.Add(observer);
         // Provide observer with existing data.
         foreach (var item in flights)
            observer.OnNext(item);
      }
      return new Unsubscriber<BaggageInfo>(observers, observer);
   }

   // Called to indicate all baggage is now unloaded.
   public void BaggageStatus(int flightNo)
   {
      BaggageStatus(flightNo, String.Empty, 0);
   }

   public void BaggageStatus(int flightNo, string from, int carousel)
   {
      var info = new BaggageInfo(flightNo, from, carousel);

      // Carousel is assigned, so add new info object to list.
      if (carousel > 0 && ! flights.Contains(info)) {
         flights.Add(info);
         foreach (var observer in observers)
            observer.OnNext(info);
      }
      else if (carousel == 0) {
         // Baggage claim for flight is done
         var flightsToRemove = new List<BaggageInfo>();
         foreach (var flight in flights) {
            if (info.FlightNumber == flight.FlightNumber) {
               flightsToRemove.Add(flight);
               foreach (var observer in observers)
                  observer.OnNext(info);
            }
         }
         foreach (var flightToRemove in flightsToRemove)
            flights.Remove(flightToRemove);

         flightsToRemove.Clear();
      }
   }

   public void LastBaggageClaimed()
   {
      foreach (var observer in observers)
         observer.OnCompleted();

      observers.Clear();
   }
}

observers 集合中。

BaggageInfo 对象。

observers 集合。

observers 集合中,如果在,则移除观察者。

internal class Unsubscriber<BaggageInfo> : IDisposable
{
   private List<IObserver<BaggageInfo>> _observers;
   private IObserver<BaggageInfo> _observer;

   internal Unsubscriber(List<IObserver<BaggageInfo>> observers, IObserver<BaggageInfo> observer)
   {
      this._observers = observers;
      this._observer = observer;
   }

   public void Dispose() 
   {
      if (_observers.Contains(_observer))
         _observers.Remove(_observer);
   }
}

virtual (C#),所以它们可全部由派生类重写。

using System;
using System.Collections.Generic;

public class ArrivalsMonitor : IObserver<BaggageInfo>
{
   private string name;
   private List<string> flightInfos = new List<string>();
   private IDisposable cancellation;
   private string fmt = "{0,-20} {1,5}  {2, 3}";

   public ArrivalsMonitor(string name)
   {
      if (String.IsNullOrEmpty(name))
         throw new ArgumentNullException("The observer must be assigned a name.");

      this.name = name;
   }

   public virtual void Subscribe(BaggageHandler provider)
   {
      cancellation = provider.Subscribe(this);
   }

   public virtual void Unsubscribe()
   {
      cancellation.Dispose();
      flightInfos.Clear();
   }

   public virtual void OnCompleted() 
   {
      flightInfos.Clear();
   }

   // No implementation needed: Method is not called by the BaggageHandler class.
   public virtual void OnError(Exception e)
   {
      // No implementation.
   }

   // Update information.
   public virtual void OnNext(BaggageInfo info) 
   {
      bool updated = false;

      // Flight has unloaded its baggage; remove from the monitor.
      if (info.Carousel == 0) {
         var flightsToRemove = new List<string>();
         string flightNo = String.Format("{0,5}", info.FlightNumber);

         foreach (var flightInfo in flightInfos) {
            if (flightInfo.Substring(21, 5).Equals(flightNo)) {
               flightsToRemove.Add(flightInfo);
               updated = true;
            }
         }
         foreach (var flightToRemove in flightsToRemove)
            flightInfos.Remove(flightToRemove);

         flightsToRemove.Clear();
      }
      else {
         // Add flight if it does not exist in the collection.
         string flightInfo = String.Format(fmt, info.From, info.FlightNumber, info.Carousel);
         if (! flightInfos.Contains(flightInfo)) {
            flightInfos.Add(flightInfo);
            updated = true;
         }
      }
      if (updated) {
         flightInfos.Sort();
         Console.WriteLine("Arrivals information from {0}", this.name);
         foreach (var flightInfo in flightInfos)
            Console.WriteLine(flightInfo);

         Console.WriteLine();
      }
   }
}

无论何时进行更改,都会对列表进行排序,并将其显示在控制台上。

在每种情况下,观察者都接收更新并正确显示行李提取信息。

using System;
using System.Collections.Generic;

public class Example
{
   public static void Main()
   {
      BaggageHandler provider = new BaggageHandler();
      ArrivalsMonitor observer1 = new ArrivalsMonitor("BaggageClaimMonitor1");
      ArrivalsMonitor observer2 = new ArrivalsMonitor("SecurityExit");

      provider.BaggageStatus(712, "Detroit", 3);
      observer1.Subscribe(provider);
      provider.BaggageStatus(712, "Kalamazoo", 3);
      provider.BaggageStatus(400, "New York-Kennedy", 1);
      provider.BaggageStatus(712, "Detroit", 3);
      observer2.Subscribe(provider);
      provider.BaggageStatus(511, "San Francisco", 2);
      provider.BaggageStatus(712);
      observer2.Unsubscribe();
      provider.BaggageStatus(400);
      provider.LastBaggageClaimed();
   }
}
// The example displays the following output:
//      Arrivals information from BaggageClaimMonitor1
//      Detroit                712    3
//
//      Arrivals information from BaggageClaimMonitor1
//      Detroit                712    3
//      Kalamazoo              712    3
//
//      Arrivals information from BaggageClaimMonitor1
//      Detroit                712    3
//      Kalamazoo              712    3
//      New York-Kennedy       400    1
//
//      Arrivals information from SecurityExit
//      Detroit                712    3
//
//      Arrivals information from SecurityExit
//      Detroit                712    3
//      Kalamazoo              712    3
//
//      Arrivals information from SecurityExit
//      Detroit                712    3
//      Kalamazoo              712    3
//      New York-Kennedy       400    1
//
//      Arrivals information from BaggageClaimMonitor1
//      Detroit                712    3
//      Kalamazoo              712    3
//      New York-Kennedy       400    1
//      San Francisco          511    2
//
//      Arrivals information from SecurityExit
//      Detroit                712    3
//      Kalamazoo              712    3
//      New York-Kennedy       400    1
//      San Francisco          511    2
//
//      Arrivals information from BaggageClaimMonitor1
//      New York-Kennedy       400    1
//      San Francisco          511    2
//
//      Arrivals information from SecurityExit
//      New York-Kennedy       400    1
//      San Francisco          511    2
//
//      Arrivals information from BaggageClaimMonitor1
//      San Francisco          511    2
 

观察程序设计模式最佳做法

本主题介绍开发人员使用这些接口实现观察者设计模式时应遵循的最佳做法。

线程处理

非线程安全的实现应显式注明它们不是线程安全的。

当实施者强加其他要求时应进行明确说明,以避免用户对观察者协定产生混淆。

处理异常

这会影响提供程序和观察者在观察者设计模式中处理异常的方式。

提供程序 -- 调用 OnError 方法

但是,OnNext 方法旨在向观察者提供当前数据或更新的数据,而 OnError 方法旨在指示提供程序无法提供有效数据。

提供程序在处理异常和调用 OnError 方法时应遵循以下最佳做法:

  • 如果提供程序具有任何特定的要求,则必须处理自己的异常。

  • 提供程序不应期待或要求观察者以任何特殊的方式处理异常。

  • 在其他情况下,则无需向观察者通知异常。

对于多线程应用程序,应使用线程安全的集合对象,如 System.Collections.Concurrent.BlockingCollection<T> 对象。

观察者 -- 实现 OnError 方法

当观察者收到来自提供程序的错误通知时,观察者应将异常作为信息处理,并且不应被要求执行任何特殊的操作。

观察者在响应来自提供程序的 OnError 方法调用时应遵循以下最佳做法:

  • 但是,如果观察者确实引发了异常,则应期望这些异常处于未处理状态。

  • 为此,应使用标准异常对象。

其他最佳做法

因此,建议您避免使用这种做法。

尽管可以将一个观察者附加到多个提供程序,但建议的模式是将一个 IObserver<T> 实例仅附加到一个 IObservable<T> 实例。

 

如何:实现提供程序

相关主题如何:实现观察程序讨论如何创建观察者。

创建提供程序

  1. TemperatureMonitor 类表示)监控的数据和观察者订阅的数据。

    using System;
    
    public struct Temperature
    {
       private decimal temp;
       private DateTime tempDate;
    
       public Temperature(decimal temperature, DateTime dateAndTime)
       {
          this.temp = temperature;
          this.tempDate = dateAndTime;
       }
    
       public decimal Degrees
       { get { return this.temp; } }
    
       public DateTime Date
       { get { return this.tempDate; } }
    }
    
  2. Temperature 泛型类型参数的构造 System.IObservable<T> 实现。

    using System;
    using System.Collections.Generic;
    
    public class TemperatureMonitor : IObservable<Temperature>
    { ... }
    
  3. TemperatureMonitor 类构造函数中实例化。

    using System;
    using System.Collections.Generic;
    
    public class TemperatureMonitor : IObservable<Temperature>
    {
       List<IObserver<Temperature>> observers;
    
       public TemperatureMonitor()
       {
          observers = new List<IObserver<Temperature>>();
       }
       ...
    }
  4. 此代码允许订阅者调用对象的 IDisposable.Dispose 实现,以将其自身从订阅者集合中移除。

    private class Unsubscriber : IDisposable
    {
       private List<IObserver<Temperature>> _observers;
       private IObserver<Temperature> _observer;
    
       public Unsubscriber(List<IObserver<Temperature>> observers, IObserver<Temperature> observer)
       {
          this._observers = observers;
          this._observer = observer;
       }
    
       public void Dispose() 
       {
          if (! (_observer == null)) _observers.Remove(_observer);
       }
    }
    
  5. TemperatureMonitor 类中 Subscribe 方法的实现。

    public IDisposable Subscribe(IObserver<Temperature> observer)
    {
       if (! observers.Contains(observer))
          observers.Add(observer);
    
       return new Unsubscriber(observers, observer);
    }
    
  6. 在这种情况下,提供程序不调用其观察者的 OnError 方法。

    public void GetTemperature()
    {
       // Create an array of sample data to mimic a temperature device.
       Nullable<Decimal>[] temps = {14.6m, 14.65m, 14.7m, 14.9m, 14.9m, 15.2m, 15.25m, 15.2m,
                                    15.4m, 15.45m, null };
       // Store the previous temperature, so notification is only sent after at least .1 change.
       Nullable<Decimal> previous = null;
       bool start = true;
    
       foreach (var temp in temps) {
          System.Threading.Thread.Sleep(2500);
          if (temp.HasValue) {
             if (start || (Math.Abs(temp.Value - previous.Value) >= 0.1m )) {
                Temperature tempData = new Temperature(temp.Value, DateTime.Now);
                foreach (var observer in observers)
                   observer.OnNext(tempData);
                previous = temp;
                if (start) start = false;
             }
          }
          else {
             foreach (var observer in observers.ToArray())
                if (observer != null) observer.OnCompleted();
    
             observers.Clear();
             break;
          }
       }
    }
    

示例

TemperatureMonitor 类(即 IObservable<T> 实现)。

using System.Threading;
using System;
using System.Collections.Generic;

public class TemperatureMonitor : IObservable<Temperature>
{
   List<IObserver<Temperature>> observers;

   public TemperatureMonitor()
   {
      observers = new List<IObserver<Temperature>>();
   }

   private class Unsubscriber : IDisposable
   {
      private List<IObserver<Temperature>> _observers;
      private IObserver<Temperature> _observer;

      public Unsubscriber(List<IObserver<Temperature>> observers, IObserver<Temperature> observer)
      {
         this._observers = observers;
         this._observer = observer;
      }

      public void Dispose() 
      {
         if (! (_observer == null)) _observers.Remove(_observer);
      }
   }

   public IDisposable Subscribe(IObserver<Temperature> observer)
   {
      if (! observers.Contains(observer))
         observers.Add(observer);

      return new Unsubscriber(observers, observer);
   }

   public void GetTemperature()
   {
      // Create an array of sample data to mimic a temperature device.
      Nullable<Decimal>[] temps = {14.6m, 14.65m, 14.7m, 14.9m, 14.9m, 15.2m, 15.25m, 15.2m,
                                   15.4m, 15.45m, null };
      // Store the previous temperature, so notification is only sent after at least .1 change.
      Nullable<Decimal> previous = null;
      bool start = true;

      foreach (var temp in temps) {
         System.Threading.Thread.Sleep(2500);
         if (temp.HasValue) {
            if (start || (Math.Abs(temp.Value - previous.Value) >= 0.1m )) {
               Temperature tempData = new Temperature(temp.Value, DateTime.Now);
               foreach (var observer in observers)
                  observer.OnNext(tempData);
               previous = temp;
               if (start) start = false;
            }
         }
         else {
            foreach (var observer in observers.ToArray())
               if (observer != null) observer.OnCompleted();

            observers.Clear();
            break;
         }
      }
   }
}
 

如何:实现观察程序

相关主题如何:实现提供程序讨论如何创建提供程序。

创建观察者

  1. Temperature 泛型类型参数构造的 System.IObserver<T> 实现。

    public class TemperatureReporter : IObserver<Temperature>
    
  2. Subscribe 方法。

    public class TemperatureReporter : IObserver<Temperature>
    {
       private IDisposable unsubscriber;
       private bool first = true;
       private Temperature last;
    
       public virtual void Subscribe(IObservable<Temperature> provider)
       {
          unsubscriber = provider.Subscribe(this);
       }
       ...
    }
  3. Unsubscribe 方法。

    public virtual void Unsubscribe()
    {
       unsubscriber.Dispose();
    }
    
  4. TemperatureReporter 类的 IObserver<T> 实现。

    public virtual void OnCompleted() 
    {
       Console.WriteLine("Additional temperature data will not be transmitted.");
    }
    
    public virtual void OnError(Exception error)
    {
       // Do nothing.
    }
    
    public virtual void OnNext(Temperature value)
    {
       Console.WriteLine("The temperature is {0}°C at {1:g}", value.Degrees, value.Date);
       if (first)
       {
          last = value;
          first = false;
       }
       else
       {
          Console.WriteLine("   Change: {0}° in {1:g}", value.Degrees - last.Degrees,
                                                        value.Date.ToUniversalTime() - last.Date.ToUniversalTime());
       }
    }
    

示例

TemperatureReporter 类(提供温度监控应用程序的 IObserver<T> 实现)的完整源代码。

public class TemperatureReporter : IObserver<Temperature>
{
   private IDisposable unsubscriber;
   private bool first = true;
   private Temperature last;

   public virtual void Subscribe(IObservable<Temperature> provider)
   {
      unsubscriber = provider.Subscribe(this);
   }

   public virtual void Unsubscribe()
   {
      unsubscriber.Dispose();
   }

   public virtual void OnCompleted() 
   {
      Console.WriteLine("Additional temperature data will not be transmitted.");
   }

   public virtual void OnError(Exception error)
   {
      // Do nothing.
   }

   public virtual void OnNext(Temperature value)
   {
      Console.WriteLine("The temperature is {0}°C at {1:g}", value.Degrees, value.Date);
      if (first)
      {
         last = value;
         first = false;
      }
      else
      {
         Console.WriteLine("   Change: {0}° in {1:g}", value.Degrees - last.Degrees,
                                                       value.Date.ToUniversalTime() - last.Date.ToUniversalTime());
      }
   }
}