老夫聊发少年狂,也来凑个写代码的热闹。
搞大数据的干啥都必须用上mapreduce,所以下面是画蛇添足版的多线程实现。但对于排序问题,下面的代码过于复杂,切勿模仿。记住,把复杂的事情变简单,是一种本事,把简单的事情变复杂,是一种病,得治!

顺便做个广告,-> _ -> // -> _ -> // -> _ -> // -> _ -> // -> _ ->

设计思路

  • 牌的初始模型:
    • 元素数量为 4*13的集合,按顺序随机生成;
    • 根据每个数的取模结果区分花色。
  • 期望结果:
    • 集合中的元素分为4组,每组模4的结果分别为1,2,3,0;
    • 组内按由小到大的顺序排列;
    • 每组第一个元素按由小到大的顺序排列;
  • 排序的实现方式:
    • 由一个工头actor把大数组分成四个小数组,四个工人actor对分给自己的小数组进行排序
    • 子线程二分法递归排序
    • 等待子线程完成,主线程拼接各子线程排序结果

测试类

此处省略N行代码....

实现类

先定义actor之间通讯用的消息类:

 sealed trait MapReduceMessage
 case class SortPorker(pokers: Iterator[Int]) extends MapReduceMessage
 case class SortSuit(suitList:List[Int]) extends MapReduceMessage
 case class Result(sortedSuit:Array[Int]) extends MapReduceMessage

SortPorker消息是由测试类发给工头actor的消息,里面是杂乱无序的52张扑克牌(无序的1~52)。SortSuit为工头actor发给工人actor的消息,根据取模结果发给对应的工人。每个工人actor在排序完成后都会返回一个Result。

下面是工头actor:

  // 工头Actor, 创建工人 Actors, 分配任务,收集结果
  class Master(nrOfWorkers: Int, latch:CountDownLatch) extends Actor {

    val workers = Vector.fill(nrOfWorkers)(actorOf[SortPokerWorker].start());
    val router = Routing.loadBalancerActor(CyclicIterator(workers)).start();

    val resultArray = new Array[Int](52);

    val suitSize = 13;

    var clubsList = List[Int]();
    var heartsList =List[Int]();
    var spadesList = List[Int]();
    var diamondsList = List[Int](); 

    var start : Long = _
    var count : Long = 0
    //对拿到的数值取模,判断花色
    def splitSuit(poker:Int) = {
             poker % 4 match {
                     case 1 =>
                            clubsList = poker :: clubsList
                            sendSuitMessage(clubsList) 
                     case 2 => 
                            heartsList = poker :: heartsList
                            sendSuitMessage(heartsList )
                     case 3 =>
                            spadesList = poker :: spadesList 
                            sendSuitMessage(spadesList )

                     case 0 => 
                            diamondsList = poker :: diamondsList 
                            sendSuitMessage(diamondsList )
             } 
    }
    //判断该花色的牌是否已全部拿到,全拿到则发给工人actor排序
    def sendSuitMessage(suitList:List[Int]) = {
           if(suitList.size == suitSize) {
                                 router ! SortSuit(suitList);
                                 count = count + 1;
           }
    }

    def receive = {
      case SortSuit(pokers: Iterator[Int]) =>
        //交给splitSuit取模,放入相应的列表
        pokers.foreach(splitSuit)

        /**
         * 不需要另外定义方法的简易写法
         * clubsList = pokers.filter(poker => poker % 4 == 1)
         * router ! sortSuit(clubsList);
         */

        //shutdown actors
        router ! Broadcast(PoisonPill)
        router ! PoisonPill

      case Result(values: Iterator[Int]) =>
        for (value <- values) {
          resultArray:+ value
        }
        count = count - 1;
        if (count <= 0) self.stop()
    }
  }

actor 类要实现Actor接口,其中有个receive方法。Routing.LoadbalancingActor是内置的actor,用于分发消息,这里用的是轮询机制。 在收到SortPoker消息后,工头会对每个值取模,放到对应的列表中,放够数的列表立马发给工人actor进行排序。得到返回结果后,顺序加到结果Array中。

工人actor如下所示:

class SortPoker extends Actor {

    def receive = {
       case  SortSuit(suitList) =>
         self reply Result(sort(suitList))
    }

    /*排序*/
    def sort(xs: Array[Int]):Array[Int] = {  
      if(xs.length <= 1)  
        xs;  
      else {  
        val pivot = xs(xs.length /2);  
        Array.concat(  
          sort(xs filter (pivot >)),  
             xs filter (pivot ==),  
          sort(xs filter (pivot < ))  
         )  
      }  
    }
  }

以上代码未经测试,可能无法运行,仅理论上 可行。

参考资料

1.用Actor实现MapReduce
2.Scala数组排序的快速实现
3.Scala Tutorial – iteration, for expressions, yield, map, filter, count