基于Java阻塞隊(duì)列的搜索實(shí)例
隊(duì)列以一種先進(jìn)先出的方式管理數(shù)據(jù)。如果你試圖向一個(gè)已經(jīng)滿了的阻塞隊(duì)列中添加一個(gè)元素,或是從一個(gè)空的阻塞隊(duì)列中移除一個(gè)元素,將導(dǎo)致線程阻塞。在多線程進(jìn)行合作時(shí),阻塞隊(duì)列是很有用的工具。工作者線程可以定期的把中間結(jié)果存到阻塞隊(duì)列中。而其他工作者線程把中間結(jié)果取出并在將來修改它們。隊(duì)列會(huì)自動(dòng)平衡負(fù)載。如果***個(gè)線程集運(yùn)行的比第二個(gè)慢,則第二個(gè)線程集在等待結(jié)果時(shí)就會(huì)阻塞。如果***個(gè)線程集運(yùn)行的快,那么它將等待第二個(gè)線程集趕上來。
下面的程序展示了如何使用阻塞隊(duì)列來控制線程集。程序在一個(gè)目錄及它的所有子目錄下搜索所有文件,打印出包含指定關(guān)鍵字的文件列表。
java.util.concurrent包提供了阻塞隊(duì)列的4個(gè)變種:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我們用的是ArrayBlockingQueue。ArrayBlockingQueue在構(gòu)造時(shí)需要給定容量,并可以選擇是否需要公平性。如果公平參數(shù)被設(shè)置了,等待時(shí)間最長的線程會(huì)優(yōu)先得到處理。通常,公平性會(huì)使你在性能上付出代價(jià),只有在的確非常需要的時(shí)候再使用它。
生產(chǎn)者線程枚舉在所有子目錄下的所有文件并把它們放到一個(gè)阻塞隊(duì)列中。這個(gè)操作很快,如果隊(duì)列沒有設(shè)上限的話,很快它就包含了沒有找到的文件。
我們同時(shí)還啟動(dòng)了大量的搜索線程。每個(gè)搜索線程從隊(duì)列中取出一個(gè)文件,打開它,打印出包含關(guān)鍵字的所有行,然后取出下一個(gè)文件。我們使用了一個(gè)小技巧來在工作結(jié)束后終止線程。為了發(fā)出完成信號(hào),枚舉線程把一個(gè)虛擬對(duì)象放入隊(duì)列。(這類似于在行李輸送帶上放一個(gè)寫著“***一個(gè)包”的虛擬包。)當(dāng)搜索線程取到這個(gè)虛擬對(duì)象時(shí),就將其放回并終止。
注意,這里不需要人任何顯示的線程同步。在這個(gè)程序中,我們使用隊(duì)列數(shù)據(jù)結(jié)構(gòu)作為一種同步機(jī)制。
- import java.io.*;
- import java.util.*;
- import java.util.concurrent.*;
- public class BlockingQueueTest
- {
- public static void main(String[] args)
- {
- Scanner in = new Scanner(System.in);
- System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
- String directory = in.nextLine();
- System.out.print("Enter keyword (e.g. volatile): ");
- String keyword = in.nextLine();
- final int FILE_QUEUE_SIZE = 10;
- final int SEARCH_THREADS = 100;
- BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
- FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
- new Thread(enumerator).start();
- for (int i = 1; i <= SEARCH_THREADS; i++)
- new Thread(new SearchTask(queue, keyword)).start();
- }
- }
- /**
- * This task enumerates all files in a directory and its subdirectories.
- */
- class FileEnumerationTask implements Runnable
- {
- /**
- * Constructs a FileEnumerationTask.
- * @param queue the blocking queue to which the enumerated files are added
- * @param startingDirectory the directory in which to start the enumeration
- */
- public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)
- {
- this.queue = queue;
- this.startingDirectory = startingDirectory;
- }
- public void run()
- {
- try
- {
- enumerate(startingDirectory);
- queue.put(DUMMY);
- }
- catch (InterruptedException e)
- {
- }
- }
- /**
- * Recursively enumerates all files in a given directory and its subdirectories
- * @param directory the directory in which to start
- */
- public void enumerate(File directory) throws InterruptedException
- {
- File[] files = directory.listFiles();
- for (File file : files)
- {
- if (file.isDirectory()) enumerate(file);
- else queue.put(file);
- }
- }
- public static File DUMMY = new File("");
- private BlockingQueue<File> queue;
- private File startingDirectory;
- }
- /**
- * This task searches files for a given keyword.
- */
- class SearchTask implements Runnable
- {
- /**
- * Constructs a SearchTask.
- * @param queue the queue from which to take files
- * @param keyword the keyword to look for
- */
- public SearchTask(BlockingQueue<File> queue, String keyword)
- {
- this.queue = queue;
- this.keyword = keyword;
- }
- public void run()
- {
- try
- {
- boolean done = false;
- while (!done)
- {
- File file = queue.take();
- if (file == FileEnumerationTask.DUMMY)
- {
- queue.put(file);
- done = true;
- }
- else search(file);
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- catch (InterruptedException e)
- {
- }
- }
- /**
- * Searches a file for a given keyword and prints all matching lines.
- * @param file the file to search
- */
- public void search(File file) throws IOException
- {
- Scanner in = new Scanner(new FileInputStream(file));
- int lineNumber = 0;
- while (in.hasNextLine())
- {
- lineNumber++;
- String line = in.nextLine().trim();
- if (line.contains(keyword)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber, line);
- }
- in.close();
- }
- private BlockingQueue<File> queue;
- private String keyword;
- }
原文鏈接:http://www.cnblogs.com/XL-Liang/archive/2012/06/13/2547929.html