首先,感谢作者在本书中 3.9 节模式的分享。在此继续分享我对于本书中的一些 demo 的看法和建议。欢迎批评指正!谢谢。

  当我读完第三章 异步消息事件驱动模式 之后,对于本书,我忽然有了这个一个印象,那就是:

理想很丰满,现实很骨感

  作者在书的首页里面曾经提到,他这些东西,最开始是在博客上写的,不过我没有翻阅他的博客,接下来的我会按照书中 demo 的内容来叙述。

  下面,我会就第三章 3.9 节 异步消息事件驱动模式,来说下作者示例代码中的一些问题,以及我对于这篇实例的修改。就像作者在书的开头所说的那样,我也和他一样,我说的全都是干货!

  首先,当我们读完作者对于第 3.9 节的模式原理性方面的描述之后,我们会发现,其实异步消息事件驱动模式的机制的核心,实际上就是三个字“无等待”。无等待的提交事务、无等待的获取处理结果、无等待的批量处理。当然,做的好一点,实际上还是应该可以让这个模式身上看到钝化模式的身影,钝化模式的本质就是“对象方法和数据的持久化”。

无等待,加上持久化,可以“让擎天柱插上翅膀”。

  但是作者…………在本书的 demo 中并没有用到。

  异步消息事件驱动模式,其实很简单,即无等待地将消息全部推送到服务器,并异步接收反馈结果。这里,为了实现这个模式,很重要的一个特点就是“异步”。而且,很明显(也许是为了简化~~),作者在这个 demo 中没有用到异步!不过说句实话,这个 demo 应该是使用异步的,实际上,没有异步,这个 demo 根本没法做到无等待提交,不是吗?

  接下来,我们看一下这个 demo 的代码。

(1) lock 语句

  在本书第 150 页,作者使用了下面这样的代码,我觉得不是很合适,可能是由于 demo 缘故,作者敲了这么段“随手代码”。虽然对于理解设计思想来说并无大碍,这个问题可以忽略不计。

lock (this)
{
    if (this.Count > 0)
    ......
}

lock 语句中被 lock 的对象也是有讲究的,最安全合适的做法是私有变量的 lock 操作,这里我就不做具体讨论了,修改的代码如下:

private static object LockObject = new object();
lock (LockObject) 
{
    ......
}

(2) 自定义事件

  同页,定义了一个 MessageNotifyEvent 事件,在 remove 里面的代码写错了。应该是 -=,事实上,对于像书上 demo 的写法,你可以完全单单定义一个 event 变量就可以了,.NET 事件内部逻辑会帮你处理这些 add 和 remove 的操作,除非你有特别的需求,需要在 add 和 remove 的时候做其他的操作,否则我认为,就 demo 来说,从简即可,无需包装成属性进行操作。

public event MessageEventNotifyHandler MessageNotifyEvent
{
    add {......}
    remove
    {
        if (......)
        {
            this.messageNotifyEvent += value;
        }
    }
}

(3) 关于枚举和常量的用法

  在本书第 15 页,作者的原话是这样的:“…………总能看到常量和枚举这两个元素出现在对方的位置上,这是不是很奇怪?应该使用枚举的地方结果却使用了常量,而应该使用常量的地方又使用了枚举。…………”,然后,当我读到第 150~151 页的时候,我觉得这里有那么一点“自相矛盾”的味道。

[Serializable]
public struct OperationState
{
    public const string Finished = "Ok";
    public const string Exception = "Error";
}

其实,这里对于操作订单的状态,枚举 再适合不过了,但他却用了常量。

(4) 用同步的 demo 来解释异步的模式

  这个 demo 应该是使用 BeginInvoke 的异步操作,而作者只是简单的使用了后期绑定、但实为同步DynamicInvoke 来处理请求操作,乍一看,还是挺高大上。所以这个 demo 无法实现注释所描述的那样,即无等待发送订单数据。

(5) 用单一的控制台 demo 展现 server-client 模式原理

  这个就不详细描述了,这点会根据后面的代码说明来

修改后的 DEMO 代码

  修改后的 demo 代码有点长,逻辑设计有点复杂,不过还是基于作者的这个 demo 的。

  改动后的 demo 分为三块:客户端、服务器端、公共类库,因为我觉得就算 demo 再简单,也得符合和体现设计原理和原则。Demo 里面,server 和 client 之间的交互,他们的核心是数据,数据是公共的,所以这些应该被抽取并定义在共同引用的类库项目里面,这里定义在 Common 这个类库里。

/// <summary>
/// 订单状态
/// </summary>
[Serializable]
public enum OrderState
{
    /// <summary>
    /// 订单未处理
    /// </summary>
    Unprocessed,
    /// <summary>
    /// 正在处理中
    /// </summary>
    InProcessing,
    /// <summary>
    /// 成功提交订单
    /// </summary>
    Submitted,
    /// <summary>
    /// 提交失败,订单不合法
    /// </summary>
    Invalid,
    /// <summary>
    /// 由于服务器连接问题,需要重新提交
    /// </summary>
    NeedTryAgain
}

原先的订单操作状态是被抽取出来的,和订单是分离的,而且状态只有两个,OK 和 Error,实际上,根据操作,我设计了五个订单状态,根据注释和枚举名称,我觉得,应该很好理解。

[Serializable]
public class OrderItem
{
    public string Id { get; private set; }
    public string Name { get; private set; }
    public double Price { get; set; }
    public int Count { get; set; }
    public OrderItem(string id, string name, double price)
    {
        Id = id;
        Name = name;
        Price = price;
    }
}
[Serializable]
public class Order
{
    public string Id { get; set; }
    public OrderState State { get; set; }
    public List<OrderItem> Items { get; set; }

    public Order()
    {
        State = OrderState.Unprocessed;
    }
}

这是订单类,处理状态被设计成订单的一部分,其实这才是合理的,订单的状态应该属于订单本身。

  接下来,我们先看看 server 端。为了模拟 server,最简单的方式就是 webservice,这里我使用 WCF 新建一个 project 来模拟 server 端。server 端的协议接口看上去像这个样子。

[ServiceContract]
public interface IOrderSubmitService
{
    /// <summary>
    /// 提交订单
    /// </summary>
    /// <param name="order"></param>
    [OperationContract]
    OrderProcessMessage SubmitOrder(Order order);
}

就一个简单的 SubmitOrder,没什么花头,来看看该方法返回的 OrderProcessMessage 类的定义。

[Serializable]
[DataContract]
public partial class OrderProcessMessage
{
    [DataMember]
    public string MessageId { get; private set; }
    [DataMember]
    public OrderState MessageState { get; private set; }
    [DataMember]
    public string MessageException { get; set; }

    public OrderProcessMessage(string id)
    {
        MessageId = id;
        MessageState = OrderState.Submitted;
        MessageException = null;
    }
    public OrderProcessMessage(string id, string exceptionMessage)
    {
        MessageId = id;
        MessageState = OrderState.Invalid;
        MessageException = exceptionMessage;
    }
}

服务器处理订单之后会返回这个类的对象,这个对象指示是否处理成功,如果失败,那么失败的原因是什么。接下来看 WCF 服务类。

[ServiceBehavior]
public class OrderSubmitService : IOrderSubmitService
{
    [WebMethod]
    public OrderProcessMessage SubmitOrder(Order order)
    {
        var message = String.Format("订单 {0} 格式不正确", order.Id);
        return IsOrderValid(order) ? new OrderProcessMessage(order.Id) : 
                                     new OrderProcessMessage(order.Id, message);
    }

    private bool IsOrderValid(Order order)
    {
        return order.Id.StartsWith("P");
    }
}

有关于 WCF 方面的东西我就不解释了。这个 DEMO 中的 WCF Server 也是灰常简单的~~,高手估计也就扫一眼。

  接下来,主要的逻辑就在客户端,在客户端我们就一个类:OrderQueue。这个类负责和 Order 有关的操作。首先,我们定义一个委托,这个委托用来定义事件,事件用来触发提交操作,所以委托和服务器 SubmitOrder 具有相同的签名。

/// <summary>
/// 处理消息事件
/// </summary>
/// <param name="order"></param>
public delegate OrderSubmitSvc.OrderProcessMessage OrderQueueEventNotifyHandler(Order order);

OrderQueue 类的设计如下:

public class OrderQueue
{
    private static object LockObject = new object();
    public static OrderQueue GlobalQueue = new OrderQueue();
    public List<Order> Orders = new List<Order>();

    private System.Timers.Timer Timer = new System.Timers.Timer();
    public List<OrderProcessMessage> ProcessMessages { get; private set; }

    public event OrderQueueEventNotifyHandler OrderProcessNotifyEvent;
    private OrderQueue();
    private Order GetFirstUnprocessedOrder();
    private void PushProcessedMessage(OrderProcessMessage message);
    private void SetOrdersRetry(Order order);
    public bool HasUnprocessedOrders();
    public bool HasInProcessingOrders();
    public bool HasRetryOrders();
    public void PrepareNeedRetryOrders();
    public void BeignDetectOrders();
    public void StopDetectOrders();
}

书中原 demo 将此类继承 Queue,实际上对于异步操作,Queue 是不够的,后面会说到。原 demo 对于该类使用了单例模式,在这里,我不作设计改动。在类的内部定义 Orders 集合,用于存放动态添加的 Order 项。Timer 用来定时的扫描 Orders 集合,如果有未处理的订单,那么,订单处理逻辑将被触发。而另外一个 ProcessMessages 集合用于存放每一个订单的处理结果。

  下面,来说一下 OrderProcessNotifyEvent 事件,这个事件用于操作订单处理,当 Orders 中存在未处理的订单时,OrderProcessNotifyEvent 所绑定的方法将被触发,在这里,我们会将其绑定到 WCF 的 SubmitOrder 方法。

  重点是私有构造器里面用 lambda 方式定义的 Timer 事件方法:

private OrderQueue()
{
    ProcessMessages = new List<OrderProcessMessage>();

    Timer.Interval = 1000;
    Timer.Elapsed += (sender, e) =>
    {
        if (HasUnprocessedOrders())
        {
            if (OrderProcessNotifyEvent != null)
            {
                // 读取第一个未处理的订单
                Order order = GetFirstUnprocessedOrder();
                if (order == null) return;

                // 设为正在处理中
                order.State = OrderState.InProcessing;

                // 添加已处理信息
                OrderProcessNotifyEvent.BeginInvoke(order, ar =>
                {
                    var processedOrder = ar.AsyncState as Order;

                    try
                    {
                        OrderProcessMessage messageResult = OrderProcessNotifyEvent.EndInvoke(ar);
                        if (processedOrder != null)
                        {
                            processedOrder.State = messageResult.MessageState;
                        }

                        // 添加处理结果
                        PushProcessedMessage(messageResult);
                        // 将处理完的订单移走
                        Orders.Remove(processedOrder);
                    }
                    catch (EndpointNotFoundException ex)
                    {
                        SetOrdersRetry(processedOrder);
                    }
                }, order);
            }
        }
    };

    BeignDetectOrders();
}

根据代码逻辑,我们可以看到,每隔一定的时间,它会扫这个 Orders 集合,如果存在未处理的,那么首先将第一个出现的未处理的订单取出,接着马上将其状态置为 InProcessing,此处这一步很关键,因为是多线程操作,所以,这里如果不设置状态,那么后续的线程取数据会出现混乱,接着传入当前订单,异步调用提交订单,在提交的时候,会发生两种可能性:成功连接服务器、服务器连接失败。如果连接成功,那么他会在回调函数的 EndInvoke 被调用的时候触发 EndpointNotFoundException 异常,这时,我们就捕获该异常,将该订单状态设置为 NeedTryAgain。如果处理成功,订单状态将被改为 Submitted / Invalid 其中之一。将处理结果记录下来,然后将其从原先的订单集合中删除。

这里就要谈一谈为什么使用 List 而不是 Queue,Queue 是先进先出,它的进出永远是头尾的操作,没有办法从中间去抽调数据;为什么说 Queue 不满足当前的操作逻辑呢?因为,提交数据是异步的,也就是说,当我第一个线程 BeginInvoke 之后,第二个线程在下一个 token 到来之后,立马进入而这个时候,假设第一个订单还未处理完,第二个订单被设置成 InProcessing。接着,如果第二个订单在第一个订单之前处理完成,那么这个时候需要从原集合中删除,如何删?这个时候如果仅仅是纯粹的 Dequeue,那么就错了,所以 Queue 不适合,而万能的 List 才是合适的选择

该类中其它一些相关的方法实现如下:

private Order GetFirstUnprocessedOrder()
{
    for (int i = 0; i < Orders.Count; i++)
    {
        var order = Orders[i];
        if (order.State == OrderState.Unprocessed)
        {
            return order;
        }
    }
    return null;
}
private void PushProcessedMessage(OrderProcessMessage message)
{
    lock (LockObject)
    {
        foreach (var existedMessage in ProcessMessages)
        {
            if (existedMessage.MessageId == message.MessageId)
            {
                existedMessage.MessageState = message.MessageState;
                existedMessage.MessageException = message.MessageException;
                return;
            }
        }

        ProcessMessages.Add(message);
    }
}
private void SetOrdersRetry(Order order)
{
    // 添加未处理消息
    var message = new OrderProcessMessage { MessageId = order.Id, 
        MessageState = OrderState.NeedTryAgain, MessageException = "无法连接服务器" };
    PushProcessedMessage(message);

    order.State = OrderState.NeedTryAgain;
}

还有几个简单的辅助方法,瞅一眼就行:

public bool HasUnprocessedOrders()
{
    return Orders.Count(o => o.State == OrderState.Unprocessed) > 0;
}
public bool HasInProcessingOrders()
{
    return Orders.Count(o => o.State == OrderState.InProcessing) > 0;
}
public bool HasRetryOrders()
{
    return Orders.Count(o => o.State == OrderState.NeedTryAgain) > 0;
}
public void PrepareNeedRetryOrders()
{
    foreach (Order order in Orders)
    {
        if (order.State == OrderState.NeedTryAgain)
        {
            order.State = OrderState.Unprocessed;
        }
    }
}
public void BeignDetectOrders()
{
    Timer.Start();
}
public void StopDetectOrders()
{
    Timer.Stop();
}

最后,是客户端 DEMO 的业务处理逻辑:

static void Run()
{
    // 初始化远程订单服务
    OrderSubmitServiceClient orderService = new OrderSubmitServiceClient();

    // 绑定服务处理方法到订单队列通知当中
    OrderQueue.GlobalQueue.OrderProcessNotifyEvent += orderService.SubmitOrder;

    // 初始化订单
    var orders = CreateOrders();
    // 无等待发送订单数据
    orders.ForEach(order => OrderQueue.GlobalQueue.Orders.Add(order));

    // 等待订单处理完成
    WaitOrderQueueProcess();

    for (OutputOrdersProcessResult(OrderQueue.GlobalQueue); 
         OrderQueue.GlobalQueue.HasRetryOrders(); 
         OutputOrdersProcessResult(OrderQueue.GlobalQueue))
    {
        Console.Write("\r\n还有未处理完的订单,是否继续提交处理?(Y / N): ");
        string presskey = null;
        for (presskey = Console.ReadLine().ToUpper(); 
             presskey != "Y" && presskey != "N"; presskey = Console.ReadLine().ToUpper())
        {
            Console.Write("\r\n请正确输入(Y / N): ");
        }

        if (presskey == "Y")
        {
            Console.WriteLine("\r\n请等待,正在尝试再次提交订单......");
            OrderQueue.GlobalQueue.PrepareNeedRetryOrders();
            Wait();
        }
        else break;
    }

    OrderQueue.GlobalQueue.StopDetectOrders();
    Console.WriteLine("全部处理结束,按任意键退出...");
}

当我们无等待提交订单之后,我们就可以在主线程中用 WaitOrderQueueProcess 来等待异步提交的返回结果。

static void WaitOrderQueueProcess()
{
    const int DOT_COUNT = 6;

    for (int i = 0; 
         (OrderQueue.GlobalQueue.HasUnprocessedOrders() || 
          OrderQueue.GlobalQueue.HasInProcessingOrders()) && i < int.MaxValue; i++)
    {
        if (i % DOT_COUNT == 0)
        {
            Console.Clear();
            Console.Write("正在处理订单 ");
        }
        else
        {
            Console.Write(".");
        }
        Thread.Sleep(500);
    }
}

剩余代码如下:

static List<Order> CreateOrders()
{
    return new List<Order>
    {
        new Order { Id = "P001", Items = new List<OrderItem>
        {
            new OrderItem("I001", "液晶显示器", 2000),
            new OrderItem("I002", "Core I7 CPU", 3000),
            new OrderItem("I003", "金士顿 DDR2 内存", 500)
        }},
        new Order { Id = "P002", Items = new List<OrderItem>
        {
            new OrderItem("I004", "土豪显示器", 5000),
            new OrderItem("I005", "土豪 CPU", 6000),
            new OrderItem("I006", "火星人内存", 1000)
        }},
        new Order { Id = "X003", Items = new List<OrderItem>
        {
            new OrderItem("I007", "CRT 显示器", 300),
            new OrderItem("I008", "奔三 CPU", 1000),
            new OrderItem("I009", "杂牌内存", 200)
        }},
        new Order { Id = "P004", Items = new List<OrderItem>
        {
            new OrderItem("I001", "液晶显示器", 2000),
            new OrderItem("I002", "Core I7 CPU", 3000),
            new OrderItem("I003", "金士顿 DDR2 内存", 500)
        }},
        new Order { Id = "V005", Items = new List<OrderItem>
        {
            new OrderItem("I004", "土豪显示器", 5000),
            new OrderItem("I005", "土豪 CPU", 6000),
            new OrderItem("I006", "火星人内存", 1000)
        }},
    };
}
static void OutputOrdersProcessResult(OrderQueue orderQueue)
{
    Console.WriteLine();
    foreach (var message in orderQueue.ProcessMessages)
    {
        Console.WriteLine(String.Format("订单 {0} 单次处理结束,{1}{2}.", 
                message.MessageId, GetOrderStateString(message.MessageState), 
                message.MessageException == null ? "" : ("," + message.MessageException)));
    }
}
static string GetOrderStateString(OrderState state)
{
    switch (state)
    {
        case OrderState.Unprocessed: return "未处理";
        case OrderState.InProcessing: return "正在处理";
        case OrderState.Submitted: return "入库成功";
        case OrderState.Invalid: return "入库失败";
        case OrderState.NeedTryAgain: return "需要再次尝试";
        default: return "";
    }
}