在本书的第一部分中,你将会学习如何使用ZeroMQ。第一部分将覆盖ZeroMQ的基础内容,其中包括API,各种socket类型及其工作原理,可靠性以及可以在应用中使用的大量模式。从头至尾阅读本部分和示例代码,你会获得不少的收获。

第一章 基础

  • 修复世界
  • 这本书的读者
  • 获取实例
  • 使用字符串需要注意的小问题
  • 一看就会
  • 版本报告
  • 将消息分发出去
  • 分而治之
  • 使用ZeroMQ进行编程
    • 正确获取上下文
    • 干净的退出
  • 为什么我们需要ZeroMQ
  • 套接字扩展性
  • 从ZeroMQ2.2升级到3.2
  • 警告:不稳定范式

修复世界

修复世界

原文 Fixing the World

怎样解释ZeroMQ是什么呢? 我们中的一些人通过它完成的所有美好的事来解释它:它是增强版的套接字(socket),它像是具有路由功能的邮箱,它很快!还有一些人试着与人分享忽然领悟到新思维时的感觉,一切都变得那么显然:事情变得更简单了,复杂性不见了,思想更开放了。另一些人他人试着通过比较来解释:它更小,更简洁,而且看起来更加熟悉。我个人想要通过本书回顾为什么ZeroMQ会获得成功,这也许也是你想要知道的。

编程是把自己打扮成艺术的科学,因为大部分人都不了解软件的物理世界,也很少有人告诉过你。软件的物理世界不是算法,数据结构,语言和抽象。我们制作这些工具,使用它们,然后把它们丢掉。真实的软件物理世界就是人类的物理世界。

具体而言,当事情变得复杂时,我们希望将大型问题分解成小块来解决它。这就是编程的科学:制作可以理解并且很容易使用的构件,一起工作来解决很大的问题。

我们生活在一个相互连接的世界,现代软件需要为世界导航。所以对于未来大型解决方案的构件是相互连接和大规模并行的。代码只是“安静健壮”(strong and silent)还远远不够。代码还需要与其他代码交流。代码必须是健谈的,社会性的,连接良好的(well-connected)。代码需要像人脑一样运行,数万亿独立的神经元互相传送消息构成超大规模并行网络,没有中心控制,也没有单点故障,还能够解决非常困难的问题。毫无疑问未来的代码会像人类的大脑,每个网络的每个末端在某种程度上是人类的大脑。

如果你曾使用过线程,协议或是网络完成工作,你就会意识到这几乎是不可能的,这就是一个梦想。即使通过几个套接字连接几个程序,在真实场景中的情况也明显是痛苦的。数万亿?成本将是不可想象的。要将这些计算机连接起来非常困难,做这事的软件和服务可是价值数十亿美元的业务。

因此,当前网络布线已经超前了数年,已经超过了我们的使用能力。在20世纪80年代有一次软件危机,当时弗雷德·布鲁克斯(Fred Brooks)等权威的软件工程师相信没有银弹能够保证在生产力,可靠性,简单性上有一个数量级的提升。

布鲁克斯错过了自由和开源软件,自由和开源软件解决了这一危机。使我们能够有效的分享知识。今天,我们正面对另一个软件危机,我们对它谈论的并不多。只有最大,最富有的企业才能创建相互连接的应用程序。一种解决方案是云计算,不过云计算是专有的。数据和知识从个人电脑中消失转移到云端,这使得我们无法访问和参与计算。谁拥有我们的社交网络?这就像是大型机到个人电脑革命的反向过程。

我们可以离开另一本书中的政治哲学。问题的关键是,互联网提供了连接大量代码的潜在能力,而现实的情况是大多数人接触不到这些代码。所以,大型有趣的问题(健康,教育,经济,交通等)仍然没有得到解决,因为有没有办法把代码连接起来,所以就没有办法把大脑连接起来合作解决这些问题。

对于解决软件连接带来的挑战,已经有过许多尝试。有数千种IETF规范,每个解决难题的一部分。对于应用程序开发人员来说,HTTP也许是一个足够易用的解决方案,HTTP鼓励开发者和架构师们按照大型服务器加简单愚笨的客户端的方式进行思考,在这一点可以认为HTTP使问题变得更糟。

现在人们仍然用原始的UDP,TCP,私有协议,HTTP,Websocket来连接应用程序。它仍然是痛苦的,缓慢的,难以形成规模,本质上是集中式的。分布式P2P框架多数用于游戏而不是工作。有多少应用使用Skype或Bittorrent交换数据?

这把我们带回到编程的科学。为了修复这个世界,我们需要做两件事情。一,解决“如何把任意地点的任意代码连接起来”的一般问题。二,将解决方案包装成人们可以轻松地理解和使用的构建。

简单得让人觉得好笑。也许是吧。而这就是问题的关键。

把消息分发出去

第二个经典模式是单向的数据分发,即服务器将更新推送到一组客户端。让我们看一个例子,天气更新的推送服务,其中的数据包括邮政编码,温度和相对湿度。我们将使用随机生成的值,真实的气象站可能也是这么做的。

下面是天气更新服务器,应用程序使用5556端口。

//  Weather update server
//  Binds PUB socket to tcp://*:5556
//  Publishes random weather updates
//
#include "zhelpers.h"

int main (void)
{
    //  Prepare our context and publisher
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    int rc = zmq_bind (publisher, "tcp://*:5556");
    assert (rc == 0);
    rc = zmq_bind (publisher, "ipc://weather.ipc");
    assert (rc == 0);

    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    while (1) {
        //  Get values that will fool the boss
        int zipcode, temperature, relhumidity;
        zipcode     = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;

        //  Send message to all subscribers
        char update [20];
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
        s_send (publisher, update);
    }
    zmq_close (publisher);
    zmq_ctx_destroy (context);
    return 0;
}

wuserver: Weather update server in C

天气的更新没有开始和结束,就像无穷无尽的广播节目。 Publish-Subscribe

下面是客户端应用,监听天气的更新并获取指定的邮政编码对应的更新数据。默认使用纽约的邮政编码10001,因为纽约是一切探险活动开始的好地方。

//
//  Weather update client
//  Connects SUB socket to tcp://localhost:5556
//  Collects weather updates and finds avg temp in zipcode
//
#include "zhelpers.h"

int main (int argc, char *argv [])
{
    void *context = zmq_ctx_new ();

    //  Socket to talk to server
    printf ("Collecting updates from weather server…\n");
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");
    assert (rc == 0);

    //  Subscribe to zipcode, default is NYC, 10001
    char *filter = (argc > 1)? argv [1]: "10001 ";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
    assert (rc == 0);

    //  Process 100 updates
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {
        char *string = s_recv (subscriber);

        int zipcode, temperature, relhumidity;
        sscanf (string, "%d %d %d",
            &zipcode, &temperature, &relhumidity);
        total_temp += temperature;
        free (string);
    }
    printf ("Average temperature for zipcode '%s' was %dF\n",
        filter, (int) (total_temp / update_nbr));

    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    return 0;
}

wuclient: Weather update client in C

注意当你使用SUB套接字时,必须像示例中那样通过zmq_setsockopt()设置SUBSCRIBE订阅选项。如果没有设置任何订阅,就不会收到消息。这是新手常犯的一个错误。订阅者可以设置多个订阅选项,它们都会生效。也就是说,订阅者会收到匹配任何订阅请求的更新。订阅者也可以取消指定的订阅。一个订阅通常是可打印的字符串,不过也可以是其他的东西。参阅zmq_setsockopt()来理解这句话。

PUB-SUB发布订阅套接字对是异步的。客户端循环调用(如果只需要一次就调用一次) zmq_msg_recv(),如果在SUB套接字上发送消息会产生错误。类似的,服务器可以按需要调用zmq_msg_send(),但是在不能在PUB套接字上调用zmq_msg_recv()

ZeroMQ的理论是不考虑连接的对端是什么,对端绑定的套接字是什么。然而在实际使用中,还是有一些文档中没有指出的不同指出,我们会在后面解释这些不同。现在除非网络使得绑定PUB然后连接到SUB不再可行,否则你就以这种方式使用吧。

关于PUB-SUB套接字还有一点需要注意:你不知道订阅者何时接收消息。即使先启动了订阅者,过一会再启动发布者,订阅者将总是无法获得发布者发送的第一个消息。这是因为当订阅者在向发布者建立连接时(需要一小段时间),发布者可能已经把消息发出去了。

这种"延迟加入"的现象困扰了很多人,让我们详细的说明一下。ZeroMQ在后台以异步I/O方式工作。假设你有两个节点按照顺序做下面的事:

  • 订阅者连接到一个端点,然后接受消息并计数
  • 发布者绑定到一个端点,立即发送1000条消息

那么订阅者很有可能什么也收不到。你可能会回过头来检查是否设置了正确的过滤器,重试后发现订阅者还是收不到任何东西。

建立TCP连接涉及三次握手,而三次握手需要几毫秒的时间,这取决于网络状况和节点之间下一跳的跳数。在这几毫秒的时间里,ZeroMQ可以发出很多消息。为了支持这一观点,假设建立连接需要5毫秒,而相同的链路可以每秒发送1兆数量的消息。在订阅者向发布者建立连接的5毫秒时间里,发布者只需要1毫秒把1千个消息发出去。

套接字与模式一章我们将会解释如何同步订阅者和发布者,使得订阅者建立连接就绪后,发布者再发送数据。有一种简单愚蠢的延迟发布的方法,即使用sleep休眠。不要在实际的应用中使用它,它太脆弱了,很不优雅,降低性能。先使用休眠证明发生了什么,然后到套接字与模式一章查看正确处理的方式。

同步的另一种替代方案是简单假设发布的数据流是无限的,没有开始也没有结束。也可以假设订阅者不关心启动之前发生的事情。这就是天气客户端的例子。

客户端订阅一个邮政编码然后收集此邮政编码的一千次更新。如果邮政编码是随机分布的,服务器大约需要更新一千万次。你可以先启动客户端,然后启动服务器,客户端仍会正常工作。只要你喜欢你可以停止和重启服务器多次,客户端仍会正常工作。当客户端收集到一千条更新后将计算并打印出平均值,然后退出程序。

下面是发布 - 订阅模式的一些要点:

  • 订阅者可以连接到不止一个发布者,每个连接使用一次connect调用。订阅者会从不同的连接轮流(“公平队列”)收取数据,不会有单个的发布者被遗漏在外。
  • 如果发布者没有与之相连的订阅者,发布者就会简单的把消息丢弃
  • 如果使用的是TCP并且订阅者处理得比较缓慢,消息会在发布者一端排队等待处理。在后面的章节中,我们会使用“高水位”来保护发布者队列。
  • 从ZeroMQ 3.x 开始,当使用连接协议时(tcp 或 ipc)过滤操作将在发布者一端执行。在ZeroMQ 2.x 所有的过滤操作都在订阅者一端执行。

下面是在我的笔记本(2011-era Intel i5)上接收并过滤10M条消息所需的时间,结果还不错。

$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001 ' was 28F

real    0m4.470s
user    0m0.000s
sys     0m0.008s

一看就会

让我们从一点代码开始。第一份代码示例当然是Hello World。我们会创建客户端和服务器。客户端向服务器发送“Hello”,服务器回复“World”。下面是C语言服务器的实现,其中在端口5555打开了一个ZeroMQ套接字,在套接字上读取请求,每收到一个请求就回复“World”:

int main (void)
{
    void *context = zmq_ctx_new ();

    //  Socket to talk to clients
    void *responder = zmq_socket (context, ZMQ_REP);
    zmq_bind (responder, "tcp://*:5555");

    while (1) {
        //  Wait for next request from client
        zmq_msg_t request;
        zmq_msg_init (&request);
        zmq_msg_recv (&request, responder, 0);
        printf ("Received Hello\n");
        zmq_msg_close (&request);

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq_msg_t reply;
        zmq_msg_init_size (&reply, 5);
        memcpy (zmq_msg_data (&reply), "World", 5);
        zmq_msg_send (&reply, responder, 0);
        zmq_msg_close (&reply);
    }
    //  We never get here but if we did, this would be how we end
    zmq_close (responder);
    zmq_ctx_destroy (context);
    return 0;
}

hwserver.c: Hello World server

图2 请求和应答

enter image description here

REQ-REP 请求应答套接字对是同步的。客户端循环顺序发起(如果只需要一次请求就发起一次) zmq_msg_send()zmq_msg_recv()。其他的顺序(例如一行发送两条消息)会导致 sendrecv 的调用返回 -1 。类似的,服务器每次需要顺序发起 zmq_msg_recv()zmq_msg_send()

ZeroMQ使用C语言作为它的参考语言并且C语言也是书中例子使用的主要语言。如果你以在线的方式阅读本书,例子下面的连接包含有其他程序语言的实现版本。让我们看一下服务端的C++语言实现:

//
//  Hello World server in C++
//  Binds REP socket to tcp://*:5555
//  Expects "Hello" from client, replies with "World"
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

int main () {
    //  Prepare our context and socket
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REP);
    socket.bind ("tcp://*:5555");

    while (true) {
        zmq::message_t request;

        //  Wait for next request from client
        socket.recv (&request);
        std::cout << "Received Hello" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (5);
        memcpy ((void *) reply.data (), "World", 5);
        socket.send (reply);
    }
    return 0;
}

hwserver.cpp: Hello World server

你会发现ZeroMQ API的C和C++版本很相似。 在类似PHP的语言中,我们可以隐藏更多的细节,代码也更容易阅读:

//
//  Hello World server
//  Binds REP socket to tcp://*:5555
//  Expects "Hello" from client, replies with "World"
//
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>

<?php
/*
*  Hello World server
*  Binds REP socket to tcp://*:5555
*  Expects "Hello" from client, replies with "World"
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/

$context = new ZMQContext(1);

//  Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");

while (true) {
    //  Wait for next request from client
    $request = $responder->recv();
    printf ("Received request: [%s]\n", $request);

    //  Do some 'work'
    sleep (1);

    //  Send reply back to client
    $responder->send("World");
}

hwserver.php: Hello World server

下面是客户端的代码

//
//  Hello World client
//  Connects REQ socket to tcp://localhost:5555
//  Sends "Hello" to server, expects "World" back
//
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

int main (void)
{
    void *context = zmq_ctx_new ();

    //  Socket to talk to server
    printf ("Connecting to hello world server…\n");
    void *requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5555");

    int request_nbr;
    for (request_nbr = 0; request_nbr != 10; request_nbr++) {
        zmq_msg_t request;
        zmq_msg_init_size (&request, 5);
        memcpy (zmq_msg_data (&request), "Hello", 5);
        printf ("Sending Hello %d…\n", request_nbr);
        zmq_msg_send (&request, requester, 0);
        zmq_msg_close (&request);

        zmq_msg_t reply;
        zmq_msg_init (&reply);
        zmq_msg_recv (&reply, requester, 0);
        printf ("Received World %d\n", request_nbr);
        zmq_msg_close (&reply);
    }
    sleep (2);
    zmq_close (requester);
    zmq_ctx_destroy (context);
    return 0;
}

hwclient: Hello World client in C

实际用起来太简单了,如你所见,ZeroMQ套接字太强大了。这个服务器可以轻松快速的同时处理数千计的客户端。试着玩一下,先打开客户端然后再打开服务器,它仍然工作得很好,想想这意味着什么。

让我们简单解释一下这两个程序究竟做了什么。它们创建了一个用来工作的ZeroMQ上下文,还有一个套接字。不用关心这些词的意思,待会你会明白的。服务器绑定他的REP应答套接字到5555端口。服务器循环等待请求,每收到一个请求就做出回应发出应答。客户端发送请求并读取服务器返回的应答。

如果杀死服务器(Ctrl-C)然后重启,客户端无法正常恢复。从崩溃进程中恢复不是很容易。创建可靠地请求应答流过于复杂了,我们会在第4章可靠请求应答模式讨论。

程序了背后隐藏了很多事情,对于程序员关心的是怎样写出简明扼要,怎样控制代码在高负荷下崩溃的频率。这就是请求应答模式,这可能是使用ZeroMQ的最简单方式,它对应于RPC和经典的客户端服务器模型。

分而治之

你可能已经厌倦了内涵丰富的代码,想要深入探讨比较抽象的语言规范。做为最后一个实例,让我们做一个超级计算服务。超级计算服务使用典型的并行设计,其中包括:

  • 一个任务发生器,产生可以并行处理的任务
  • 一组处理任务的工作者
  • 一个收集工作进程处理结果的任务收集器(sink)

实际应用中,工作进程可以在非常快的硬件上运行,例如用GPU处理复杂的数学运算。下面是任务发生器的代码,它产生100个任务,每个任务通过消息发送给工作者,任务的内容就是让工作进程休眠几毫秒。

//
//  Task ventilator
//  Binds PUSH socket to tcp://localhost:5557
//  Sends batch of tasks to workers via that socket
//
#include "zhelpers.h"

int main (void) 
{
    void *context = zmq_ctx_new ();

    //  Socket to send messages on
    void *sender = zmq_socket (context, ZMQ_PUSH);
    zmq_bind (sender, "tcp://*:5557");

    //  Socket to send start of batch message on
    void *sink = zmq_socket (context, ZMQ_PUSH);
    zmq_connect (sink, "tcp://localhost:5558");

    printf ("Press Enter when the workers are ready: ");
    getchar ();
    printf ("Sending tasks to workers...\n");

    //  The first message is "0" and signals start of batch
    s_send (sink, "0");

    //  Initialize random number generator
    srandom ((unsigned) time (NULL));

    //  Send 100 tasks
    int task_nbr;
    int total_msec = 0;     //  Total expected cost in msecs
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
        int workload;
        //  Random workload from 1 to 100msecs
        workload = randof (100) + 1;
        total_msec += workload;
        char string [10];
        sprintf (string, "%d", workload);
        s_send (sender, string);
    }
    printf ("Total expected cost: %d msec\n", total_msec);
    sleep (1);              //  Give 0MQ time to deliver

    zmq_close (sink);
    zmq_close (sender);
    zmq_ctx_destroy (context);
    return 0;
}

Parallel task ventilator

并行流水线

下面是工作者程序代码。工作者每接收一条消息,从中读出休眠时间并休眠,然后发出完成信号。

//
//  Task worker
//  Connects PULL socket to tcp://localhost:5557
//  Collects workloads from ventilator via that socket
//  Connects PUSH socket to tcp://localhost:5558
//  Sends results to sink via that socket
//
#include "zhelpers.h"

int main (void) 
{
    void *context = zmq_ctx_new ();

    //  Socket to receive messages on
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");

    //  Socket to send messages to
    void *sender = zmq_socket (context, ZMQ_PUSH);
    zmq_connect (sender, "tcp://localhost:5558");

    //  Process tasks forever
    while (1) {
        char *string = s_recv (receiver);
        //  Simple progress indicator for the viewer
        fflush (stdout);
        printf ("%s.", string);

        //  Do the work
        s_sleep (atoi (string));
        free (string);

        //  Send results to sink
        s_send (sender, "");
    }
    zmq_close (receiver);
    zmq_close (sender);
    zmq_ctx_destroy (context);
    return 0;
}

下面是收集器程序代码。它收集100个任务的结果,然后计算总的处理时间。处理时间可以用来验证工作者是否是并行执行的。

//
//  Task sink
//  Binds PULL socket to tcp://localhost:5558
//  Collects results from workers via that socket
//
#include "zhelpers.h"

int main (void)
{
    //  Prepare our context and socket
    void *context = zmq_ctx_new ();
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_bind (receiver, "tcp://*:5558");

    //  Wait for start of batch
    char *string = s_recv (receiver);
    free (string);

    //  Start our clock now
    int64_t start_time = s_clock ();

    //  Process 100 confirmations
    int task_nbr;
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
    char *string = s_recv (receiver);
    free (string);
    if ((task_nbr / 10) * 10 == task_nbr)
        printf (":");
    else
        printf (".");
    fflush (stdout);
    }
    //  Calculate and report duration of batch
    printf ("Total elapsed time: %d msec\n",
    (int) (s_clock () - start_time));

    zmq_close (receiver);
    zmq_ctx_destroy (context);
    return 0;
}

批处理的平均时间是5秒。分别启用1,2,4个工作者进程,收集器打印出的结果如下:

#   1 worker
Total elapsed time: 5034 msec
#   2 workers
Total elapsed time: 2421 msec
#   4 workers
Total elapsed time: 1018 msec

让我们更加仔细的看看代码的以下几个方面:

  • 工作者连接上游的任务发生器和下游的任务收集器。这意味着可以添加任意数量的工作者。如果将工作者绑定到自身指定的端点,那么每添加一个工作者就需要(a)添加额外的端点,(b)修改发生器或收集器。在整体架构中,可以认为生成器和收集器是稳定部分,而工作者是动态部分。

  • 我们需要在开始时同步批处理,等待工作者启动运行。这是ZeroMQ中常见的问题,没有简单的解决方案。connect方法需要一些时间。所以,当一组工作者连接到任务发生器时,第一个工作者会在短时间内获得大量的消息,同时其他的工作者还在建立连接。如果不再批处理任务开始时进行同步,系统几乎不会并行执行。试着移除等待步骤,看一下会发生什么。

  • 发生器的PUSH套接字把任务平均分发给工作者(假设在批处理任务开始前,工作者都已连接)。这称作负载均衡,在后面的章节中,我们会讨论负载均衡的更多细节。

  • 收集器的PULL套接字公平的从工作者收取结果,这被称为公平队列

公平队列

管道模式也有“慢连接”的情况,这会导致PUSH推送套接字负载不均衡 。在使用PUSH和PULL时,如果其中一个工作者的PULL套接字建立连接的速度比其他工作者快,那它就会获取更多的消息,因为在其他工作者建立连接时,它已经收取了一些消息。