第 2 章 使用基本元素:Thread和Runnable

第 2 章 使用基本元素:ThreadRunnable

执行线程是并发应用程序的核心。实现并发应用程序时,无论采用何种编程语言,都必须创建不同的执行线程,并且这些线程以不确定的顺序并行运行,除非你使用同步元素,比如信号量。在Java中,创建执行线程有两种方法。

  • 扩展Thread类。
  • 实现Runnable接口。

本章将介绍在Java中使用这些元素实现并发应用程序的方法,主要内容如下。

  • Java中的线程:特征和状态。
  • Thread类和Runnable接口。
  • 第一个例子:矩阵乘法。
  • 第二个例子:文件搜索。

2.1 Java中的线程

如今,计算机用户(以及移动终端和平板电脑用户)使用电脑工作时要同时使用不同的应用程序。阅读新闻、在社交网络上发表文章或听音乐的同时,可以使用文字处理程序编写文档。之所以可以同时做以上所有事情,是因为现代操作系统支持多进程处理。

用户可以同时执行不同的任务。此外,在应用程序内部,你也可以同时做不同的事情。例如,如果你正在使用文字处理程序,在为文本添加粗体样式的同时便可保存文件。这是因为用于编写这些应用程序的现代编程语言允许程序员在应用程序中创建多个执行线程。每个执行线程执行不同的任务,这样你就可以同时做不同的事情。

Java使用Thread类实现执行线程。你可以使用以下机制在应用程序中创建执行线程。

  • 扩展Thread类并重载run()方法。
  • 实现Runnable接口,并将该类的对象传递给Thread对象的构造函数。

这两种情况下你都会得到一个Thread对象,但是相对于第一种方式来说,更推荐使用第二种。其主要优势如下。

  • Runnable是一个接口:你可以实现其他接口并扩展其他类。对于采用Thread类的方式,你只能扩展这一个类。
  • 可以通过线程来执行Runnable对象,但也可以通过其他类似执行器的Java并发对象来执行。这样可以更灵活地更改并发应用程序。
  • 可以通过不同线程使用同一Runnable对象。

一旦有了Thread对象,就必须使用start()方法创建新的执行线程并且执行Thread类的run()方法。如果直接调用run()方法,那么你将调用常规Java方法而不会创建新的执行线程。下面来看看Java编程语言中线程最重要的特征。

2.1.1 Java中的线程:特征和状态

关于Java的线程,首先要说明的是,所有的Java程序,不论并发与否,都有一个名为主线程的Thread对象。你可能知道,Java SE程序通过main()方法启动执行过程。执行该程序时,Java虚拟机(JVM)将创建一个新Thread并在该线程中执行main()方法。这是非并发应用程序中唯一的线程,也是并发应用程序中的第一个线程。

与其他编程语言相同,Java中的线程共享应用程序中的所有资源,包括内存和打开的文件。这是一个强大的工具,因为它们可以快速而简单地共享信息。但是,正如第1章所述,必须使用足够的同步元素避免数据竞争条件。

Java中的所有线程都有一个优先级,这个整数值介于Thread.MIN_PRIORITYThread.MAX_PRIORITY之间(实际上它们的值分别是1和10)。所有线程在创建时其默认优先级都是Thread.NORM_PRIORITY(实际上它的值是5)。可以使用setPriority()方法更改Thread对象的优先级(如果该操作不允许执行,它会抛出SecurityException异常)和getPriority()方法获得Thread对象的优先级。对于Java虚拟机和线程首选底层操作系统来说,这种优先级是一种提示,而非一种契约。线程的执行顺序并没有保证。通常,较高优先级的线程将在较低优先级的线程之前执行,但是,正如之前所述,这一点并不能保证。

在Java中,可以创建两种线程。

  • 守护线程。
  • 非守护线程。

二者之间的区别在于它们如何影响程序的结束。当有下列情形之一时,Java程序将结束其执行过程。

  • 程序执行Runtime类的exit()方法,而且用户有权执行该方法。
  • 应用程序的所有非守护线程均已结束执行,无论是否有正在运行的守护线程。

具有这些特征的守护线程通常用在作为垃圾收集器或缓存管理器的应用程序中,执行辅助任务。你可以使用isDaemon()方法检查线程是否为守护线程,也可以使用setDaemon()方法将某个线程确立为守护线程。要注意,必须在线程使用start()方法开始执行之前调用此方法。

最后,不同情况下线程的状态不同。所有可能的状态都在Thread.States类中定义。你可以使用getState()方法获取Thread对象的状态。显然,你还可以直接更改线程的状态。线程的可能状态如下。

  • NEWThread对象已经创建,但是还没有开始执行。
  • RUNNABLEThread对象正在Java虚拟机中运行。
  • BLOCKEDThread对象正在等待锁定。
  • WAITINGThread对象正在等待另一个线程的动作。
  • TIME_WAITINGThread对象正在等待另一个线程的操作,但是有时间限制。
  • THREADThread对象已经完成了执行。

在给定时间内,线程只能处于一个状态。这些状态不能映射到操作系统的线程状态,它们是JVM使用的状态。了解了Java编程语言中最重要的线程特性之后,让我们来看看Runnable接口和Thread类最重要的方法。

2.1.2 Thread类和Runnable接口

如前文所述,你可以使用以下任一机制创建新的执行线程。

  • 扩展Thread类并且重载其run()方法。
  • 实现Runnable接口,并将该对象的实例传递给Thread对象的构造函数。

在好的Java实践做法中,相对于第一种方法而言,更推荐使用第二种方法,这将是我们在本章以及整本书中都将采用的方法。

Runnable接口只定义了一种方法:run()方法。这是每个线程的主方法。当你执行start()方法来启动一个新线程时,它将调用run()方法(Thread类的run()方法或者在Thread类的构造函数中以参数形式传递的Runnable对象)。

相反,Thread类有很多不同的方法。它有一种run()方法,实现线程时必须重载该方法,扩展Thread类和你必须调用的start()方法创建新的执行线程。下面给出Thread类的其他常用方法。

  • 获取和设置Thread对象信息的方法。
    • getId():该方法返回Thread对象的标识符。该标识符是在线程创建时分配的一个正整数。在线程的整个生命周期中是唯一且无法改变的。
    • getName()/setName():这两种方法允许你获取或设置Thread对象的名称。这个名称是一个String对象,也可以在Thread类的构造函数中建立。
    • getPriority()/setPriority():你可以使用这两种方法来获取或设置Thread对象的优先级。在本章中,上文已经解释了Java如何管理线程的优先级。
    • isDaemon()/setDaemon():这两种方法允许你获取或建立Thread对象的守护条件。此前已经解释过该条件的原理。
    • getState():该方法返回Thread对象的状态。之前已经介绍过Thread对象的所有可能状态。
  • interrupt()/interrupted()/isInterrupted():第一种方法表明你正在请求结束执行某个Thread对象。另外两种方法可用于检查中断状态。这些方法的主要区别在于,调用interrupted()方法时将清除中断标志的值,而isInterrupted()方法不会。调用interrupt()方法不会结束Thread对象的执行。Thread对象负责检查标志的状态并做出相应的响应。
  • sleep():该方法允许你将线程的执行暂停一段时间。它将接收一个long型值作为参数,该值代表你想要Thread对象暂停执行的毫秒数。
  • join():这个方法将暂停调用线程的执行,直到调用该方法的线程执行结束为止。可以使用该方法等待另一个Thread对象结束。
  • setUncaughtExceptionHandler():当线程执行出现未校验异常时,该方法用于建立未校验异常的控制器。
  • currentThread():这是Thread类的静态方法,它返回实际执行该代码的Thread对象。

接下来,你将学习如何使用这些方法来实现如下两个示例。

  • 一个矩阵乘法应用程序。
  • 一个在操作系统中查找文件的应用程序。

2.2 第一个例子:矩阵乘法

矩阵乘法是针对矩阵做的基本运算之一,也是并发和并行编程课程中常采用的经典问题。如果你有一个mn 列的矩阵A,和另一个np 列的矩阵B,那么可以将两个矩阵相乘得到一个mp 列的矩阵C

本节将实现两个矩阵相乘的串行版本算法,以及三种不同的并发版本。然后,我们将比较四个解决方案,看看何时并发处理会带来更好的性能。

2.2.1 公共类

为了实现这个例子,我们用到了一个名为MatrixGenerator的类。使用它随机生成将进行乘法操作的矩阵。这个类有一种名为generate()的方法,它接收矩阵中所需的行数和列数作为参数,并基于这两个维数生成一个带有随机double值的矩阵。该类的源代码如下:

public class MatrixGenerator {

  public static double[][] generate (int rows, int columns) {
    double[][] ret=new double[rows][columns];
    Random random=new Random();
    for (int i=0; i<rows; i++) {
      for (int j=0; j<columns; j++) {
        ret[i][j]=random.nextDouble()*10;
      }
    }
    return ret;
  }
}

2.2.2 串行版本

我们在SerialMultiplier类中实现了该算法的串行版本。该类只有一种静态方法,名为multiply()。它接收三个double型矩阵作为参数:其中两个矩阵是将要相乘的矩阵,另一个矩阵用于存储结果。

我们并不检查矩阵的维数,只保证其正确性,并使用一个三重嵌套循环计算结果矩阵。SerialMultiplier类的源代码如下:

public class SerialMultiplier {

  public static void multiply (double[][] matrix1, double[][] matrix2,
                               double[][] result) {
    int rows1=matrix1.length;
    int columns1=matrix1[0].length;

    int columns2=matrix2[0].length;

    for (int i=0; i<rows1; i++) {
      for (int j=0; j<columns2; j++) {
        result[i][j]=0;
        for (int k=0; k<columns1; k++) {
          result[i][j]+=matrix1[i][k]*matrix2[k][j];
        }
      }
    }
  }
}

我们还实现了一个名为SerialMain的主类,用于测试串行版矩阵乘法算法。在main()方法中,生成两个2000行2000列的随机矩阵,并使用SerialMultiplier类进行两个矩阵的乘法运算。算法执行时间的单位是毫秒,如下所示:

public class SerialMain {

  public static void main(String[] args) {

    double matrix1[][] = MatrixGenerator.generate(2000, 2000);
    double matrix2[][] = MatrixGenerator.generate(2000, 2000);
    double resultSerial[][]= new double[matrix1.length]
                                       [matrix2[0].length];

    Date start=new Date();
    SerialMultiplier.multiply(matrix1, matrix2, resultSerial);
    Date end=new Date();
    System.out.printf("Serial: %d%n",end.getTime()-start.getTime());
  }
}

2.2.3 并行版本

我们已经实现了三种不同的并行算法,基于不同的粒度实现这些例子。

  • 结果矩阵中每个元素对应一个线程。
  • 结果矩阵中每行对应一个线程。
  • 采用与JVM中可用处理器数或核心数相同的线程。

让我们来看看这三个版本的源代码。

  1. 第一个并发版本:每个元素一个线程

    在这个版本中,我们将在结果矩阵中为每个元素创建一个新的执行线程。例如,将两个2000行2000列的矩阵相乘,得到的矩阵将有4 000 000个元素,因此我们将创建4 000 000个Thread对象。因为如果同时启动所有线程,可能会使系统超载,所以将以10个线程一组的形式启动线程。

    启动10个线程后,使用join()方法等待它们完成,而且一旦完成,就启动另外10个线程。我们一直遵循这个过程,直到启动所有必需线程。选择10作为批量处理线程数并没有特殊理由。你也可以更改这一数值,并查看更改后的数值对算法性能的影响。

    我们将实现IndividualMultiplierTask类和ParallelIndividualMultiplier类。IndividualMultiplierTask类将实现每个Thread。该类实现了Runnable接口,将使用五个内部属性:两个要相乘的矩阵、结果矩阵,以及要计算的元素的行和列。我们将使用该类的构造函数来初始化所有这些属性:

    public class IndividualMultiplierTask implements Runnable {
    
      private final double[][] result;
      private final double[][] matrix1;
      private final double[][] matrix2;
    
      private final int row;
      private final int column;
    
      public IndividualMultiplierTask(double[][] result, double[][]
                                      matrix1, double[][] matrix2,
                                      int i, int j) {
        this.result = result;
        this.matrix1 = matrix1;
        this.matrix2 = matrix2;
        this.row = i;
        this.column = j;
      }

    run()方法将计算由rowcolumn属性决定的元素值。下面的代码将展示如何实现该行为。

      @Override
      public void run() {
        result[row][column] = 0;
        for (int k = 0; k < matrix1[row].length; k++) {
          result[row][column] += matrix1[row][k] * matrix2[k][column];
        }
      }
    }

    ParallelIndividualMultiplier类将创建所有必要的执行线程计算结果矩阵。它有一种名为multiply()的方法,接收两个将要相乘的矩阵和第三个用于存储结果的矩阵作为参数。该类将处理结果矩阵的所有元素,并创建一个单独的IndividualMultiplierTask类计算每个元素。如前所述,我们按照10个一组的方式启动线程。启动10个线程后,可使用waitForThreads()辅助方法等待这10个线程最终完成,该方法调用了join()方法。下面的代码块展示了该类的实现:

    public class ParallelIndividualMultiplier {
    
      public static void multiply(double[][] matrix1, double[][] matrix2,
                                  double[][] result) {
    
        List<Thread> threads=new ArrayList<>();
    
        int rows1=matrix1.length;
    
        int rows2=matrix2.length;
    
        for (int i=0; i<rows1; i++) {
          for (int j=0; j<columns2; j++) {
            IndividualMultiplierTask task=new IndividualMultiplierTask
                                     (result, matrix1, matrix2, i, j);
            Thread thread=new Thread(task);
            thread.start();
            threads.add(thread);
    
            if (threads.size() % 10 == 0) {
              waitForThreads(threads);
            }
          }
        }
    
      }
    
      private static void waitForThreads(List<Thread> threads){
        for (Thread thread: threads) {
          try {
            thread.join();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
    
        threads.clear();
      }
    
    }

    与其他示例相同,我们创建了一个主类用以测试该示例。它与SerialMain类非常相似,但在本例中,我们将它称为ParallelIndividualMain类。此处不再给出该类的源代码。

  2. 第二个并发版本:每行一个线程

    在这一版本中,我们将在结果矩阵中为每一行创建一个新的执行线程。例如,如果将两个2000行和2000列的矩阵相乘,就要创建4 000 000个线程。正如前面的示例中所做的那样,我们将以10个线程为一组启动线程,然后等待它们终结,再启动新线程。

    我们将实现RowMultiplierTask类和ParallelRowMultiplier类以实现该版本。RowMultiplierTask类将实现每个Thread。它实现了Runnable接口,并且将使用五个内部属性:两个要相乘的矩阵、结果矩阵,以及要计算的结果矩阵的行。我们将使用该类的构造函数来初始化所有这些属性,如下所示。

    public class RowMultiplierTask implements Runnable {
    
      private final double[][] result;
      private final double[][] matrix1;
      private final double[][] matrix2;
    
      private final int row;
    
      public RowMultiplierTask(double[][] result, double[][] matrix1,
                               double[][] matrix2, int i) {
        this.result = result;
        this.matrix1 = matrix1;
        this.matrix2 = matrix2;
        this.row = i;
      }

    run()方法有两个循环。第一个循环将处理待计算结果矩阵row中的所有元素,而第二个循环将计算每个元素的结果值。

    @Override
    public void run() {
      for (int j = 0; j < matrix2[0].length; j++) {
        result[row][j] = 0;
          for (int k = 0; k < matrix1[row].length; k++) {
            result[row][j] += matrix1[row][k] * matrix2[k][j];
          }
        }
      }
    }

    ParallelRowMultiplier类将创建计算结果矩阵所需的所有执行线程。它有一种名为multiply()的方法,该方法接收两个待乘矩阵和第三个用于存储结果的矩阵作为参数。它将处理结果矩阵的所有行,并创建一个RowMultiplierTask处理每一行。如前所述,我们以10个为一组的方式启动线程。启动10个线程后,使用waitForThreads()辅助方法等待这10个线程最终完成,它将调用join()方法。下面的代码块展示了如何实现这个类:

    public class ParallelRowMultiplier {
    
      public static void multiply(double[][] matrix1, double[][]
                                  matrix2, double[][] result) {
    
        List<Thread> threads = new ArrayList<>();
    
        int rows1 = matrix1.length;
    
        for (int i = 0; i < rows1; i++) {
          RowMultiplierTask task = new RowMultiplierTask(result,
                                             matrix1, matrix2, i);
          Thread thread = new Thread(task);
          thread.start();
          threads.add(thread);
    
          if (threads.size() % 10 == 0) {
            waitForThreads(threads);
          }
        }
      }
    
      private static void waitForThreads(List<Thread> threads){
        for (Thread thread : threads) {
          try {
            thread.join();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        threads.clear();
      }
    
    }

    与其他示例相同,我们创建了一个主类用以测试这个例子。它与SerialMain类非常相似,但在本例中,我们将它称为ParallelRowMain类。此处不再给出该类的源代码。

  3. 第三个并发版本:线程的数量由处理器决定

    在最后一个版本中,只创建与JVM可用核或处理器数量相同的线程。我们使用Runtime类的availableProcessors()方法计算这一数值。

    GroupMultiplierTask类和ParallelGroupMultiplier类中实现了此版本。GroupMultiplierTask类实现了我们将要创建的线程。它实现了Runnable接口,并且使用了五个内部属性:两个要相乘的矩阵、结果矩阵,以及该任务将要计算的结果矩阵的初始行和最终行。我们将使用该类的构造函数初始化所有这些属性。下面的代码块展示了如何实现类的第一部分:

    public class GroupMultiplierTask implements Runnable {
    
      private final double[][] result;
      private final double[][] matrix1;
      private final double[][] matrix2;
    
      private final int startIndex;
      private final int endIndex;
    
      public GroupMultiplierTask(double[][] result, double[][]
                                 matrix1, double[][] matrix2,
                                 int startIndex, int endIndex) {
        this.result = result;
        this.matrix1 = matrix1;
        this.matrix2 = matrix2;
        this.startIndex = startIndex;
        this.endIndex = endIndex;
      }

    run()方法将使用三个循环实现其计算。第一个循环将检查该任务将要计算的结果矩阵的行,第二个循环将处理每一行的所有元素,最后一个循环将计算每个元素的值。

      @Override
      public void run() {
        for (int i = startIndex; i < endIndex; i++) {
          for (int j = 0; j < matrix2[0].length; j++) {
            result[i][j] = 0;
            for (int k = 0; k < matrix1[i].length; k++) {
              result[i][j] += matrix1[i][k] * matrix2[k][j];
            }
          }
        }
      }
    }

    ParallelGroupMutiplier类将创建线程计算结果矩阵。它有一种名为multiply()的方法,接收要相乘的两个矩阵和第三个用于存放结果的矩阵作为参数。首先,通过使用Runtime类的availableProcessors()方法获取可用处理器的数量。然后,计算每个任务必须处理的行,以及创建并启动这些线程。最后,使用join()方法等待线程结束。

    public class ParallelGroupMultiplier {
    
      public static void multiply(double[][] matrix1, double[][] matrix2,
                                  double[][] result) {
        List<Thread> threads=new ArrayList<>();
    
        int rows1=matrix1.length;
    
        int numThreads=Runtime.getRuntime().availableProcessors();
        int startIndex, endIndex, step;
        step=rows1 / numThreads;
        startIndex=0;
        endIndex=step;
    
        for (int i=0; i<numThreads; i++) {
          GroupMultiplierTask task=new GroupMultiplierTask
                       (result, matrix1, matrix2, startIndex, endIndex);
          Thread thread=new Thread(task);
          thread.start();
          threads.add(thread);
          startIndex=endIndex;
          endIndex= i==numThreads-2?rows1:endIndex+step;
        }
    
        for (Thread thread: threads) {
          try {
            thread.join();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
    
      }
    
    }

    与其他示例相同,我们创建了一个主类测试这个例子。它与SerialMain类非常相似,但在本例中,我们将它称为ParallelGroupMain类。此处不再给出该类的源代码。

  4. 比较方案

    比较一下本节中实现的乘法器算法四个版本的解决方案(包括串行版和并发版)。为了测试该算法,我们已经使用JMH框架执行这些示例,该框架可支持在Java中实现微基准测试。使用基准测试框架是一种很好的解决方案,可以直接使用currentTimeMillis()nanoTime()等方法度量时间。在两种不同架构中,执行这些例子各10次。

    • 一台计算机配置有Intel Core i5-5300i处理器、Windows 7操作平台和16GB内存。该处理器有两个核,每个核可以执行两个线程,所以将有四个并行线程。
    • 另一台计算机配置有AMD A8-640处理器、Windows 10操作系统和8GB内存,此处理器有四个核。

    我们已用三种不同大小的随机矩阵测试了算法:

    • 500×500
    • 1000×1000
    • 2000×2000

    下表给出了平均执行时间以及标准偏差(单位:毫秒)。

    算法规模AMDIntel
    串行版5001821.729±366.885447.920±49.864
    100027 661.481±796.6705474.942±164.447
    2000315 457.940±32 961.16570 968.563±4056.883
    按个体处理的并行版50043 512.382±813.13117 152.883±170.408
    1000164 968.834±1034.45372 858.419±381.258
    2000774 681.287±17 380.02316 466.479±5033.577
    按行处理的并行版500685.465±72.474229.228±61.497
    10008565±437.6113710.613±411.490
    200092 923.685±11 595.43342 655.081±1370.940
    按分组处理的并行版500515.743±51.106133.530±12.271
    10007466.880±409.1363862.635±368.427
    200086 639.811±2834.143 353.603±1857.568

    由上表可以得出以下结论。

    • 这两种架构有很大不同,但是你必须考虑到两台电脑处理器、操作系统、内存和硬盘等的配置不同。
    • 在两种架构上得到的结果相同。按分组处理的并行版和按行处理的并行版得到了最佳结果,而按个体处理的并行版得到的结果最差。

    这个例子告诉我们,开发一个并发应用程序时必须非常小心。如果没有选择良好的解决方案,那么性能表现会很糟糕。

    针对500×500矩阵,我们用性能最佳的并发版本和串行版本求取加速比,以此来考查并发处理对算法性能的改进情况。

    \begin{aligned}&S_{{\rm AMD}}=\frac{T_{\rm serial}}{T_{\rm concurrent}}=\frac{1821.729}{515.743}=3.53\\&S_{{\rm Intel}}=\frac{T_{\rm serial}}{T_{\rm concurrent}}=\frac{447.920}{133.530}=3.35\end{aligned}

2.3 第二个例子:文件搜索

所有操作系统都提供了一种功能,即在文件系统中搜索符合某种条件的文件。(例如,按照名称或部分名称、修改日期等进行搜索。)在我们的示例中将实现一个算法,用于查找具有预定名称的文件。该算法将采用启动搜索的初始路径和要查找的文件作为输入。JDK提供了遍历目录树结构的功能,因此不需要再次在实际应用中自己实现它。

2.3.1 公共类

这两个版本的算法将共享一个公共类用以存储搜索结果。我们将其称为Result类,而它有两个属性:一个名为foundBoolean值,用于判定是否找到了正在查找的文件;一个名为pathString值。如果找到了该文件,就将其完整路径存放在该属性中。

这个类的代码非常简单,所以此处不再给出源代码。

2.3.2 串行版本

这个算法的串行版本非常简单。搜索初始路径,获取文件和目录内容,并对其进行处理。对于文件来说,会将其名称与正在寻找的名称进行比较。如果相同,则将其填入Result对象并完成算法执行。对于各目录来说,我们对本操作进行递归调用,以便在这些目录中搜索文件。

我们将在SerialFileSearch类的searchFiles()方法中实现这个操作。SerialFileSearch类的源代码如下:

  public class SerialFileSearch {

    public static void searchFiles(File file, String fileName,
                                   Result result) {

    File[] contents;
    contents=file.listFiles();

    if ((contents==null) || (contents.length==0)) {
      return;
    }

    for (File content : contents) {
      if (content.isDirectory()) {
        searchFiles(content,fileName, result);
      } else {
        if (content.getName().equals(fileName)) {
          result.setPath(content.getAbsolutePath());
          result.setFound(true);
          System.out.printf("Serial Search: Path: %s%n",
                            result.getPath());
          return;
        }
      }
      if (result.isFound()) {
        return;
      }
    }
  }
}

2.3.3 并发版本

并行化该算法有多种方法(如下所示)。

  • 你可以为我们要处理的每个目录创建一个执行线程。
  • 你可以将目录树分组,并为每个组创建执行线程。你创建的组数将决定应用程序使用的执行线程数。
  • 你可以使用与JVM的可用核数相同的线程数。

在这种情况下,我们必须考虑到算法将集中使用I/O操作。因为一次只有一个线程可以读取磁盘,所以不是所有解决方案都会提高算法串行版本的性能。

我们将按照最后一种供选方案实现并发版本。将在一个ConcurrentLinkedQueue(一个可以在并发应用程序中使用的队列Queue接口实现)中存储初始路径所包含的目录,并创建与JVM可用处理器数量相同的线程。每个线程将从队列中获取一条路径,并处理该目录及其所有子目录和其中的文件。线程处理完毕该目录中的所有文件和目录时,将从队列中提取另一个目录。

如果其中一个线程找到了正在查找的文件,该线程会立即终止执行。在这种情况下,我们使用interrupt()方法结束其他线程的执行。

我们在ParallelGroupFileTask类和ParallelGroupFileSearch类中实现了该版本的算法。ParallelGroupFileTask类实现了所有将用于查找文件的线程。它实现了Runnable接口并且使用了四个内部属性:一个名为fileNameString属性,用于存储待查找文件的名称;一个名为directoriesFile对象的ConcurrentLinkedQueue,用于存放将要处理的目录列表;一个名为parallelResultResult对象,用于存储搜索结果;一个名为foundBoolean属性,用于标记是否发现了正在寻找的文件。我们将使用该类的构造函数初始化所有属性:

public class ParallelGroupFileTask implements Runnable {

  private final String fileName;
  private final ConcurrentLinkedQueue<File> directories;
  private final Result parallelResult;
  private boolean found;

  public ParallelGroupFileTask(String fileName, Result parallelResult,
                          ConcurrentLinkedQueue<File>directories) {
    this.fileName = fileName;
    this.parallelResult = parallelResult;
    this.directories = directories;
    this.found = false;
  }

run()方法有一个循环,在队列中有元素并且没有找到该文件时会被执行。它使用ConcurrentLinkedQueue类的poll()方法处理下一个目录,并调用辅助方法processDirectory()。如果找到了这个文件(found属性为true),那么使用return语句结束线程。

@Override
public void run() {
  while (directories.size() > 0) {
    File file = directories.poll();
    try {
      processDirectory(file, fileName, parallelResult);
      if (found) {
        System.out.printf("%s has found the file%n",
                          Thread.currentThread().getName());
        System.out.printf("Parallel Search: Path: %s%n",
                          parallelResult.getPath());
        return;
      }
    } catch (InterruptedException e) {
      System.out.printf("%s has been interrupted%n",
                        Thread.currentThread().getName());
    }
  }
}

如果找到了作为参数的文件,processDirectory()方法将接收存放待处理目录的File对象、正在查找的文件名和存放结果的Result对象。它使用listFiles()方法获取File对象的内容。listFiles()方法可返回File对象数组并对其进行处理。对于目录来说,这将建立一个新对象对该方法进行递归调用。对于文件来说,该方法将调用辅助的processFile()方法:

private void processDirectory(File file, String fileName,
                              Result parallelResult) throws
             InterruptedException {
  File[] contents;
  contents = file.listFiles();

  if ((contents == null) || (contents.length == 0)) {
    return;
  }

  for (File content : contents) {
    if (content.isDirectory()) {
      processDirectory(content, fileName, parallelResult);
      if (Thread.currentThread().isInterrupted()) {
        throw new InterruptedException();
      }
      if (found) {
        return;
      }
    } else {
      processFile(content, fileName, parallelResult);
      if (Thread.currentThread().isInterrupted()) {
        throw new InterruptedException();
      }
      if (found) {
        return;
      }
    }
  }
}

在处理完每个目录和文件之后,还要检查线程是否被中断。我们使用Thread类的currentThread()方法获取执行该任务的Thread对象,然后使用isInterrupted()方法来验证线程是否被中断。如果线程被中断,我们将抛出一个新的InterruptedExeption异常,在run()方法中捕捉该异常以结束线程的执行。这种机制使我们能够在找到文件后完成搜索。

我们还要检查found属性是否为true。如果为true,我们将立即返回以完成线程的执行。

如果找到了作为参数的文件,processFile()方法接收存储待处理文件的File对象、待查找文件的名称、存放操作结果的Result对象。我们将当前处理File的名称与正在查找的文件名称进行比较。如果两个名称相同,那么填入Result对象并且将found属性设置为true,如下所示:

  private void processFile(File content, String fileName,
                           Result parallelResult) {
    if (content.getName().equals(fileName)) {
      parallelResult.setPath(content.getAbsolutePath());
      this.found = true;
    }
  }

  public boolean getFound() {
    return found;
  }
}

ParallelGroupFileSearch类使用辅助任务实现了整个算法。它将实现静态的searchFiles()方法,接收一个指向搜索基本路径的File对象、一个存储当前查找文件名称的fileNameString、存放操作结果的Result对象作为参数。

首先,创建ConcurrentLinkedQueue对象,并且将基本路径所包含的所有目录存放在其中,如下所示:

public class ParallelGroupFileSearch {

  public static void searchFiles(File file, String fileName,
                                 Result parallelResult) {

    ConcurrentLinkedQueue<File> directories = new
                                     ConcurrentLinkedQueue<>();
    File[] contents = file.listFiles();

    for (File content : contents) {
      if (content.isDirectory()) {
        directories.add(content);
      }
    }

然后,我们使用Runtime类的availableProcessors()方法获得JVM可用线程的数量,创建一个ParallelFileGroupTask对象,并且为每个处理器创建一个Thread

int numThreads = Runtime.getRuntime().availableProcessors();
Thread[] threads = new Thread[numThreads];
ParallelGroupFileTask[] tasks = new ParallelGroupFileTask
                                              [numThreads];

for (int i = 0; i < numThreads; i++) {
  tasks[i] = new ParallelGroupFileTask(fileName, parallelResult,
                                       directories);
  threads[i] = new Thread(tasks[i]);
  threads[i].start();
}

最后,等待某个线程找到文件或者所有线程都完成执行为止。对于第一种情况,使用interrupt()方法和前面提到的机制取消其他线程的执行。使用Thread类的getState()方法检查各个线程是否已完成执行,如下所示:

  boolean finish = false;
  int numFinished = 0;

  while (!finish) {
    numFinished = 0;
    for (int i = 0; i < threads.length; i++) {
      if (threads[i].getState() == State.TERMINATED) {
        numFinished++;
        if (tasks[i].getFound()) {
          finish = true;
        }
      }
    }
    if (numFinished == threads.length) {
      finish = true;
    }
  }
  if (numFinished != threads.length) {
    for (Thread thread : threads) {
      thread.interrupt();
    }
  }
}

2.3.4 对比解决方案

比较一下本节中实现的乘法器算法四个版本的解决方案(串行版和并发版)。为了测试该算法,我们使用JMH框架执行这些示例,该框架可支持用Java语言实现微基准测试。使用基准测试框架是一种很好的解决方案,它可以直接使用currentTimeMillis()nanoTime()等方法来计算时间。我们在两种不同架构中执行这些例子各10次。

  • 一台计算机配置有Intel Core i5-5300i处理器、Windows 7操作系统和16GB内存。该处理器有两个核,每个核可以执行两个线程,所以将有四个并行线程。
  • 另一台计算机配置有AMD A8-640处理器、Windows 10操作系统和8GB内存。该处理器有四个核。

在Windows目录下用两个不同的文件名测试算法:

  • hosts
  • yyy.yyy

我们已经在Windows操作系统上测试了算法。第一个文件存在而第二个文件不存在。如果你使用了其他操作系统,要相应地更改文件的名称。以下表格给出了以毫秒为单位的平均执行时间及其标准偏差。

算法范围AMDIntel
串行版hosts5869.019±124.5482955.535±69.252
yyy.yyy26 474.179±785.68014 508.276±195.725
并行版hosts2792.313±100.8851972.248±193.386
yyy.yyy21 337.288±954.34412 742.856±361.681

我们可以得出以下结论。

  • 这两种架构的性能有所区别,但是你必须考虑到它们的处理器、操作系统、内存和硬盘不同。
  • 在两种架构上得到的结果相同。并行算法的性能优于串行算法。对hosts文件的搜索来说,这种性能差异要比查找不存在的文件更大。

我们可以用搜索hosts文件性能最好的并发版本和串行版本求取加速比,以此来观察采用并发处理如何提高算法的性能。

\begin{aligned}&S_{{\rm AMD}}=\frac{T_{\rm serial}}{T_{\rm concurrent}}=\frac{5869.019}{2792.313}=2.10\\&S_{{\rm Intel}}=\frac{T_{\rm serial}}{T_{\rm concurrent}}=\frac{2955.535}{1972.248}=1.5\end{aligned}

2.4 小结

本章介绍了在Java中创建执行线程的最基本元素:Runnable接口和Thread类。在Java中,创建线程的方式有两种。

  • 扩展Thread类并且重载run()方法。
  • 实现Runnable接口,并且将该类的对象传递给Thread类的构造函数。

第二种机制比第一种更受欢迎,因为它带来了更大的灵活性。

我们还了解了Thread类中有许多不同的方法。用这些方法可以获取线程信息,更改线程的优先级,或者等待线程结束。我们在两个例子中使用了所有这些方法,其中一个例子是矩阵乘法,另一个例子是在目录中搜索文件。在这两种情况下,并发处理呈现的性能更好,但是我们也明白了,实现算法的并发版本时必须小心。若使用并发处理的方式不合适,那么性能也会糟糕。

下一章将介绍执行器框架,在该框架下创建并发应用程序时不必担心线程的创建和管理。

目录

  • 版权声明
  • 译者序
  • 前言
  • 第 1 章 第一步:并发设计原理
  • 第 2 章 使用基本元素:Thread和Runnable
  • 第 3 章 管理大量线程:执行器
  • 第 4 章 充分利用执行器
  • 第 5 章 从任务获取数据:Callable接口与Future接口
  • 第 6 章 运行分为多阶段的任务:Phaser类
  • 第 7 章 优化分治解决方案:Fork/Join框架
  • 第 8 章 使用并行流处理大规模数据集:MapReduce模型
  • 第 9 章 使用并行流处理大规模数据集:MapCollect模型
  • 第 10 章 异步流处理:反应流
  • 第 11 章 探究并发数据结构和同步工具
  • 第 12 章 测试与监视并发应用程序
  • 第 13 章 JVM中的并发处理:Clojure、带有GPars库的Groovy以及Scala