Java并发编程之线程治理(高级线程同步10)

Java并发编程之线程管理(高级线程同步10)

3.4 在一个共同点上同步任务

Java 并发API提供了一个同步实用程序,它允许两个或者多个线程在一个确定的点进行同步(synchronization)。这个类就是循环阻塞类(CyclicBarrier)。这个类与CountDownLatch类有点类似,但是也有不同之处。


CyclicBarrier类也是以一个integer数来初始化一个实例类,在一个确定点上,这个数就是所有线程必须同步的线程数。当这些线程到达确定点时,它调用await()方法来等待其它线程。当这个线程调用这用方法时,CyclicBarrier类阻塞并使之睡眠直到其它线程都到达为止。当最后的线程调用await() 方法时,它将唤醒所有等待的线程并继续执行它的工作。


CyclicBarrier类与CountDownLatch类有共同之处,但是,它们也有一些不同之处。最重要的不同之一就是一个CyclicBarrier对象能够重设置为它的初始化状态,赋值给它内部计数器。


下面来看一个例子,说明CyclicBarrier如何使用。假设我们有个大矩阵,矩阵中存放着0到10的数,现在,我们想查找值是5 的个数。为了高效的解决这个问题,我们使用了CyclicBarrier类。

定义矩阵类MatrixMock:

 

import java.util.Random;
 
/**
 * This class generates a randommatrix of integer numbers between 1 and 10
 *
 */
public class MatrixMock {
   
    /**
     * Bi-dimensional array with the random numbers
     */
    private intdata[][];
   
    /**
     * Constructor of the class. Generates the bi-dimensional array ofnumbers.
     * While generates the array, it counts thetimes that appears the number we are going
     * to look for so we can check that theCiclycBarrier class does a good job
     * @param size Number of rows of the array
     * @param length Number of columns of the array
     * @param number Number we are going to look for
     */
    public MatrixMock(intsize, intlength, intnumber){
 
        int counter=0;
        data=new int[size][length];
        Random random=new Random();
        for (int i=0; i<size; i++) {
            for (int j=0; j<length; j++){
                data[i][j]=random.nextInt(10);
                if (data[i][j]==number){
                    counter++;
                }
            }
        }
        System.out.printf("Mock: There are %d ocurrences of number in generated data.\n",counter,number);
    }
   
    /**
     * This methods returns a row of the bi-dimensional array
     * @param row the number of the row to return
     * @return the selected row
     */
    public int[]getRow(introw){
        if ((row>=0)&&(row<data.length)){
            return data[row];
        }
        return null;
    }
 
}

 

定义并行执行结果类Results:

 

/**
 * This class is used to store thenumber of occurrences of the number
 * we are looking for in each row ofthe bi-dimensionalarray
 *
 */
public class Results {
   
    /**
     * Array to store the number of occurrences ofthe number in each row of the array
     */
    private intdata[];
 
    /**
     * Constructor of the class. Initializes itsattributes
     * @param size Size of the array to store the results
     */
    public Results(intsize){
        data=new int[size];
    }
 
    /**
     * Sets the value of one position in the arrayof results
     * @param position Position in the array
     * @param value Value to set in that position
     */
    public void  setData(int position, int value){
        data[position]=value;
    }
   
    /**
     * Returns the array of results
     * @return the array of results
     */
    public int[]getData(){
        return data;
    }
}


定义搜索群组类:

 

/**
 * Group the results of eachSearcher. Sum the values stored in the Results object
 * An object of this class isexecuted automatically by the CyclicBarrier when
 * all the Searchers finish its job
 */
public class Grouper implements Runnable {
 
    /**
     * Results object with the occurrences of thenumber in each row
     */
    private Results results;
   
    /**
     * Constructor of the class. Initializes itsattributes
     * @param results Results object with the ocurrences of thenumber in each row
     */
    public Grouper(Results results){
        this.results=results;
    }
   
    /**
     * Main method of the Grouper. Sum the valuesstored in the Results object
     */
    @Override
    public voidrun() {
        int finalResult=0;
        System.out.printf("Grouper: Processing results...\n");
        int data[]=results.getData();
        for (int number : data){
            finalResult += number;
        }
        System.out.printf("Grouper: Total result: %d.\n",finalResult);
    }
 
}

定义搜索类:

 

public class Searcher implements Runnable {
 
 
    /**
     * First row where look for
     */
    private intfirstRow;
   
    /**
     * Last row where look for
     */
    private intlastRow;
   
    /**
     * Bi-dimensional array with the numbers
     */
    private MatrixMock mock;
   
    /**
     * Array to store the results
     */
    private Results results;
   
    /**
     * Number to look for
     */
    private intnumber;
   
    /**
     * CyclicBarrier to control the execution
     */
    private finalCyclicBarrier barrier;
   
    /**
     * Constructor of the class. Initializes itsattributes
     * @param firstRow First row where look for
     * @param lastRow Last row where fook for
     * @param mock Object with the array of numbers
     * @param results Array to store the results
     * @param number Number to look for
     * @param barrier CyclicBarrier to control the execution
     */
    public Searcher(intfirstRow, intlastRow, MatrixMock mock, Results results, int number, CyclicBarrier barrier){
        this.firstRow=firstRow;
        this.lastRow=lastRow;
        this.mock=mock;
        this.results=results;
        this.number=number;
        this.barrier=barrier;
    }
 
    /**
     * Main method of the searcher. Look for thenumber in a subset of rows. For each row, saves the
     * number of occurrences of the number in thearray of results
     */
    @Override
    public voidrun() {
        int counter;
        System.out.printf("%s: Processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow);
        for (int i=firstRow; i < lastRow;i++){
            int row[]=mock.getRow(i);
            counter=0;
            for (int j=0; j<row.length; j++){
                if (row[j]==number){
                    counter++;
                }
            }
            results.setData(i, counter);
        }
        System.out.printf("%s: Lines processed.\n",Thread.currentThread().getName());      
        try {
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
   
    /**
     * Main method of the example
     * @param args
     */
    public staticvoidmain(String[] args) {
 
        /*
         * Initializes the bi-dimensional arrayof data
         *      10000rows
         *      1000numbers in each row
         *      Lookingfor number 5
         */
        final int ROWS=10000;
        final int NUMBERS=1000;
        final int SEARCH=5;
        final int PARTICIPANTS=5;
        final int LINES_PARTICIPANT=2000;
        MatrixMock mock=new MatrixMock(ROWS,NUMBERS,SEARCH);
       
        // Initializes the object for the results
        Results results=new Results(ROWS);
       
        // Creates an Grouper object
        Grouper grouper=new Grouper(results);
       
        // Creates the CyclicBarrier object. It has 5 participants and, when
        // they finish, the CyclicBarrier will execute the grouper object
        CyclicBarrier barrier=newCyclicBarrier(PARTICIPANTS, grouper);
       
        // Creates, initializes and starts 5 Searcher objects
        Searcher searchers[]=new Searcher[PARTICIPANTS];
        for (int i=0; i < PARTICIPANTS; i++){
            searchers[i]=newSearcher(i*LINES_PARTICIPANT,
                                     (i*LINES_PARTICIPANT)+LINES_PARTICIPANT,
                                      mock,
                                      results,
                                      5,
                                      barrier);
            Thread thread=new Thread(searchers[i]);
            thread.start();
        }
        System.out.printf("Main: The main thread has finished.\n");
    }
   
}

执行结果:

Mock:There are 998891 ocurrences of number in generated data.
Thread-0:Processing lines from 0 to 2000.
Main:The main thread has finished.
Thread-3:Processing lines from 6000 to 8000.
Thread-4:Processing lines from 8000 to 10000.
Thread-1:Processing lines from 2000 to 4000.
Thread-2:Processing lines from 4000 to 6000.
Thread-3:Lines processed.
Thread-4:Lines processed.
Thread-0:Lines processed.
Thread-1:Lines processed.
Thread-2:Lines processed.
Grouper:Processing results...
Grouper: Total result: 998891.


    如果按照一般的处理操作,这个效率会大大降低。可见,CyclicBarrier的功能强大之处。