原文:Iteratees in Big Data at Klout

Klout以几个社交网络为源,在有限的时间内将用户相关的数据采集过来,并进行整合,以此计算出TA的社会影响力,再将这些指标以可视化的方式呈现给用户。

enter image description here

为了能及时准确地呈现这些数据,Klout所面临的技术压力很大。这篇文章要介绍Klout在重新设计的数据采集管道中如何应用Play! Iteratees 达成目标,但并不会讲解Iteratees的基本概念,因为这方面优秀的文章已经很多了,比如James Roper的Josh Suereth的。本文的重点是Iteratees 在大规模数据采集情景中的实际应用,以及Klout选择它解决这个问题的理由。在后续的文章中还会介绍Klout基于Akka的分布式消息基础设施,使得Klout可以将基于Iteratee 的数据采集分布到集群的机器上去,实现良好的扩展性。

三句话讲清楚Iteratees

第一句,Iteratees 是用函数式的方法对producers 和 consumers数据流进行建模。Enumerator不断产生一块块的数据,可以用(也可以不用)Enumeratee 对这些数据进行map/adapte,然后由Iteratee处理掉。各个阶段可以拼接到一起工作,像个管道一样。

Enumerator (produce data) → Enumeratee (map data) → Iteratee (consume data)

Play! Iteratee还有别的组合方式,比如Enumerator 互相交织(多个并发的enumerators),Enumeratee 链接,Iteratee 折叠(将enumerated 数据分组到更大的数据块中)。

原来的数据采集

Klout原来的数据采集框架是用Java写的,建立在java.util.concurrent类库之上。因此数据采集是用彼此孤立的节点完成的,它们用阻塞线程实现错综复杂的成功或重试语义,顺序获取用户数据。随着用户群的扩大,数据集也迅速增长,系统低下的效率问题日益凸显。

数据获取和写到磁盘上的方式跟Apache web服务器的服务请求方式几乎一样;对于一个用户数据采集的请求,只有一个线程负责代码路径中所有可能的IO操作。这一IO操作大致由下面三个阶段(对于每个用户/网络/数据类型的组合)构成:

  1. 派发恰当的api调用,解析json响应
  2. 将数据写到硬盘 / HDFS
  3. 更新指标

这三个阶段必须顺序执行,但同一阶段是高度并行的,可能更重要的是可以异步执行。比如说,在第一阶段我们发起多个并行的API调用来构建某个用户的活动,比如John Doe在Facebook上发布了“早上我喜欢吃油条!”,并收到了20条评论和200个赞。这个由状态消息和全部20条评论和200个赞组成的活动可以用异步和并行的方式构建。

基于Iteratee的新采集框架

意识到非阻塞并发实现的好处,Klout决定用绚烂的Play!+Scala+Akka技术组合重新做采集框架的架构。这个技术组合有很多优秀的特性集和类库,Klout最感兴趣的就是Iteratees的Typesafe实现。这一实现中有很多好东西,比如Iteratee.foreach和丰富的Enumeratee实现。Klout还大量使用了 Play! WebServices类库,它对Ning 异步http客户端做了轻量的scala封装,并且用Play Promises跟iteratee类库做了非常漂亮的整合(跟Akka promises的完全整合要到Play 2.1 发布)。

分页Enumerator

社交网络中的数据可以通过调用api获得,一般都会返回可分页的json数据。为了处理这种数据,我们需要一个通用并抽象的分页机制来处理我们获取的每种数据,比如帖子,赞或评论。Klout利用了每页数据中都有的next链接,用一个fromCallback Enumerator做了非常优雅的处理:

import play.api.libs.ws._
import play.api.libs.iteratee._

def pagingEnumerator(url:String):Enumerator[JsValue]={     
      var maybeNextUrl = Some(url) //Next url to fetch    
    Enumerator.fromCallback[JsValue] ( retriever = {
        val maybeResponsePromise = 
            maybeNextUrl map { nextUrl=>                                               
                WS.url(nextUrl).get.map { reponse =>
                    val json = response.json
                    maybeNextUrl = (json \ "next_url").asOpt[String] 
                    val code = response.status //Potential error handling here
                    json
                }                    
            }

        /* maybeResponsePromise will be an Option[Promise[JsValue]]. 
         * Need to 'flip' it, to make it a Promise[Option[JsValue]] to 
         * conform to the fromCallback constraints */
        maybeResponsePromise match {
            case Some(responsePromise) => responsePromise map Some.apply
            case None                  => PlayPromise pure None
        }
    })
}

上面的代码中没有错误处理、重试和回退逻辑,但可以让你形成很好的初始认识。对于给定的起始链接,enumerator 只要跟踪一个可变状态nextUrl,每次调用retriever函数时就更新它。用这个分页enumerator 可以交互式地获取数据,不会取得比所需更多的数据。比如可以把这个enumerator 应用到一个文件写入的Enumeratee 上,可以确保不会把硬盘压垮。或者用‘take’ Enumeratee 限制ws调用的次数。然后把状态更新iteratee附到这个处理链上,以确保数据库不会被压垮。如果你还不太明白,可以把Enumeratee当做一个适配器,可以把由Enumerator产生的数据类型转换成被Iteratee消费的数据类型。

Enumerator 之 Enumerators

分页enumerator 跟踪下一页的url很班,但每页中的json数据通常都是一个帖子列表,需要单独处理。每个帖子通常都关联一组喜欢跟评论,并有相应的获取url,也需要进行分页处理并加入到最初那个帖子的json数据中,从而构造一个完整的活动,可以做最终的处理。Klout想将每个活动都当做一个独立的json文档,包括与其相关的喜欢和评论元数据,同时能满足Klout不会把他们的系统压垮的需求,又能发起尽可能多的API并发调用。利用Iteratee类库高度可拼接的属性,可以在处理帖子流的同时获取相关的喜欢和评论,并用Enumeratee.interleaveIteratee.fold的组合并行构建每个活动:

type CommentsOrLikes = Either[Comments, Likes]

def buildActivity(post: Post): Promise[Activity] = {

    val likeUrl = LIKE_URL % (post.id, target.token)
    val commentsUrl = COMMENT_URL % (post.id, target.token)

    /*Construct paging enumerators, mapping each value to either Left or Right*/
    val comments = pagingEnumerator(commentsUrl).map(Left.apply) 
    val likes = pagingEnumerator(likeUrl).map(Right.apply) 

    //Enumerate likes and comments in parallel with the 'interleave' function
    val content:Enumerator[CommentsOrLikes] = likes interleave comments

    /*Initial value for fold*/
    val activity = Activity(Nil, post, Nil)

    /*Fold over each enumerated value, building the activity as we go*/
    val activityIterateePromise = 
        content |>> Iteratee.fold[CommentsOrLikes, Activity](activity) {        
            case Left(comments) => activity copy (comments = activity.comments ++ comments)
            case Right(likes)   => activity copy (likes = activity.likes ++ likes)        
        }

    /* Finally, activityIterateePromise will be a Promise[Iteratee[Activity]], 
     * which we need to turn into an Iteratee[Activity] and then run it to 
     * actually build our activity */
    Iteratee.flatten(activityIterateePromise).run    
}

现在就可以把buildActivity方法应用到每个帖子列表中的每个帖子上了:

val posts:Enumerator[List[Post]] = pagingEnumerator(postsUrl) map parseToPostList
/* parseToPostList does exactly that. Creates a list of Post objects from json*/

val activities:Enumerator[Enumerator[Activity]] = posts.map{ 
    postList =>    
        Enumerator.apply(postList:_*) map buildActivity
}

最后我们需要把Enumerators 中的Enumerator 展平来创建活动的Enumerator。可现在编写,展平Enumerators 还不是Play! Iteratee类库的标准操作,所以要自己写:

/*
* Flatten an enumerator of enumerators of some type into an enumerator of some type
*/
def flatten[T](enumerator: Enumerator[Enumerator[T]]): Enumerator[T] = new Enumerator[T] {
    def step[A](it: Iteratee[T, A])(in: Input[Enumerator[T]]): Iteratee[Enumerator[T], Iteratee[T, A]] = {
        in match {
            case Input.EOF   => Done(Iteratee.flatten(it.feed(Input.EOF)), Input.EOF)
            case Input.Empty => Cont(step(it))
            case Input.El(e) => {
                val promise = e |>> Cont(removeEof(it))
                val next = Iteratee.flatten(promise.flatMap(_.run))
                next.pureFlatFold(
                    (v, l) => Done(next, in),
                    (_) => Cont(step(next)),
                    (msg, input) => Error(msg, in))
            }
        }
    }

    def apply[A](it: Iteratee[T, A]): PlayPromise[Iteratee[T, A]] = {
        it.fold(
            (v, l) => PlayPromise pure it,
            (_) => enumerator |>> Cont(step(it)) flatMap (_.run),
            (msg, input) => PlayPromise pure it
        )
    }
}

    /*Wrap the iteratee with an outer feeding iteratee, which does not feed EOF*/
def removeEof[A, T](inner: Iteratee[T, A])(el: Input[T]): Iteratee[T, Iteratee[T, A]] = {
    el match {
        case Input.Empty | Input.El(_) =>
            inner.pureFlatFold (
                (n, i) => Done(inner, Input.Empty),
                k => Cont(removeEof(k(el))),
                (m, i) => Error(m, i))
        case Input.EOF => Done(inner, Input.Empty)
    }
}

文件写入Enumeratee

武装上这个交互式,可分页和并行的活动Enumerator之后,我们需要把它挂到文件写入逻辑上。为了简化问题,我们不再展开文件写入的内部逻辑,假定都是用下面这个函数完成的:

writeToFile(json: JsValue): Promise[Either[Failure, Success]]

writeToFile的类型签名来看,它以异步方式执行,最终会返回FailureSuccess 对象。我们可以用它构建一个Enumeratee,然后也挂到活动Enumerator之上(作为Iteratee 管道的一部分):

/*Enumeratee to manage writing to the file writer. Mapping any errors to Left*/
type ErrorOrActivity = Either[Error,Activity]

def fileWriting: Enumeratee[Activity, ErrorOrActivity] = {        

    /* writeToFile returns a Promise, but the Enumeratee type constraint 
     * does not expect a Promise. flatMap will return an 
     * Enumeratee[Activity,ErrorOrActivity] given a function from Activity 
     * to Promise[ErrorOrActivity].
     */
    KloutEnumeratee.flatMap[ErrorOrActivity] { activity=>            
        writeToFile(activity.json).map{
            case e @ Failure(_)     => Left(e)
            case _                  => Right(activity) 
        }            
    }
}

flatMap也不是Iteratee标准类库中的方法:

object KloutEnumeratee {
    def flatMapInput[From] = new {
        def apply[To](f: Input[From] => PlayPromise[Input[To]]) = 
            new Enumeratee.CheckDone[From, To] { //Checkdone is part of the Play Iteratee library
                def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {
                    case in @ (Input.El(_) | Input.Empty) =>
                        val promiseOfInput = f(in)
                        Iteratee.flatten(promiseOfInput map { input =>
                            new CheckDone[From, To] {
                                def continue[A](k: K[To, A]) = Cont(step(k))
                            } &> k(input)
                        })

                    case Input.EOF => Done(k(Input.EOF), Input.EOF)
                }

                def continue[A](k: K[To, A]) = Cont(step(k))
            }
    }

    def flatMap[E] = new {
        def apply[NE](f: E => Promise[NE]): Enumeratee[E, NE] = flatMapInput[E]{
            case Input.El(e) => f(e) map (Input.El(_))
            case Input.Empty => Promise pure Input.Empty
            case Input.EOF   => Promise pure Input.EOF
        }

    }
}

文件写入Enumeratee 只是把Activity映射到Either上,如果writeToFile 失败,其中包含的就是Failure ,如果成功,就是需要进一步处理的Activity 。注意,尽管从概念上来看文件写入更像Iteratee任务,但因为我们不想“消耗”来自Enumerator的输入,只是要把输入做个映射以便后续处理,所以用Enumeratee结构更合适。现在3阶段管道中的第2阶段已经完成了。

状态更新Iteratee

在处理每个Activity时,需要循环采集和报告状态,游标信息,错误和其它元数据。既然这是最后阶段,就应该把它作为管道中的洗涤槽,即Iteratee。为了阐明问题,突出重点,下面这个是简化版的Iteratee,但足以说明问题了:

/* Status updating and reporting iteratee*/
def updatingStatus:Iteratee[ErrorOrActivity,Unit] = Iteratee.foreach[ErrorOrActivity] {
    case Left(error)        => 
        reportError(error)
        statsd("collector.error")
    case Right(activity)    => 
        reportSuccess(activity) 
        statsd("collector.success")
}

拼到一起

最后一步是把这几个家伙聚到一起干点有意义的事:

//The collect function below returns an Enumerator[Activity], given some target meta-data
val iterateePromise = collect(target) &> fileWriting |>> updatingCursor
iterateePromise.flatMap(_.run)

这个框架之美在于它的简洁,更在于它的组合拼接能力。只要在实现一个恰当类型的EnumerateeIteratee,就可以在管道上加上新的阶段,还能免费得到其它好处。

数据采集是Klout体验的基础,是整合、分析和跟踪社交生活影响力的必要条件。正是因为有优秀的数据采集框架,Klout才能突出我们最有影响力的时刻。

enter image description here