| ページ一覧 | ブログ | twitter |  書式 | 書式(表) |

MyMemoWiki

「Java 並行処理」の版間の差分

提供: MyMemoWiki
ナビゲーションに移動 検索に移動
 
1行目: 1行目:
==Java 並行処理==
+
==[[Java 並行処理]]==
[[Java]] | {{category 並行処理}}
+
[[Java]] | [[Category:並行処理]]
  
 
==非同期に grep を実施する例1==
 
==非同期に grep を実施する例1==
 
===内容===
 
===内容===
*Thread ではなく、Executor を使って、Runnable を非同期に実行する
+
*Thread ではなく、Executor を使って、[[R]]unnable を非同期に実行する
 
*共有データにスレッドセーフな単一の変数([http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent.atomic])を使用する
 
*共有データにスレッドセーフな単一の変数([http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent.atomic])を使用する
 
====[http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent.atomic]====
 
====[http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent.atomic]====
10行目: 10行目:
 
====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/Executor.html Executor]====
 
====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/Executor.html Executor]====
 
*[http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent]
 
*[http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent]
*送信された Runnable タスクを実行するオブジェクト
+
*送信された [[R]]unnable タスクを実行するオブジェクト
 
*通常、executor は、明示的にスレッドを作成する代わりに使用
 
*通常、executor は、明示的にスレッドを作成する代わりに使用
 
====ソース====
 
====ソース====
  import java.io.BufferedReader;
+
  import java.io.Buffered[[R]]eader;
 
  import java.io.BufferedWriter;
 
  import java.io.BufferedWriter;
 
  import java.io.File;
 
  import java.io.File;
19行目: 19行目:
 
  import java.io.FileOutputStream;
 
  import java.io.FileOutputStream;
 
  import java.io.FilenameFilter;
 
  import java.io.FilenameFilter;
  import java.io.InputStreamReader;
+
  import java.io.InputStream[[R]]eader;
 
  import java.io.OutputStreamWriter;
 
  import java.io.OutputStreamWriter;
 
  import java.util.ArrayList;
 
  import java.util.ArrayList;
35行目: 35行目:
 
     static AtomicInteger workingThreadCount = null;
 
     static AtomicInteger workingThreadCount = null;
 
      
 
      
     private static final int THREAD_SIZE = 10;
+
     private static final int TH[[R]]EAD_SIZE = 10;
 
     public static void main(String[] args) throws Exception {
 
     public static void main(String[] args) throws Exception {
 
      
 
      
71行目: 71行目:
 
          
 
          
 
         // スレッドで処理させる単位(バケツ)のサイズを決定
 
         // スレッドで処理させる単位(バケツ)のサイズを決定
         int buketSize = files.length / THREAD_SIZE;
+
         int buketSize = files.length / TH[[R]]EAD_SIZE;
 
         buketSize++;
 
         buketSize++;
 
          
 
          
99行目: 99行目:
 
         // 非同期処理を行う
 
         // 非同期処理を行う
 
         Executor executer = new Executor() {
 
         Executor executer = new Executor() {
             public void execute(Runnable command) {
+
             public void execute([[R]]unnable command) {
 
                 new Thread(command).start();
 
                 new Thread(command).start();
 
             }
 
             }
117行目: 117行目:
 
       * grep を実行する
 
       * grep を実行する
 
       */
 
       */
     public static class GrepCommand implements Runnable {
+
     public static class GrepCommand implements [[R]]unnable {
 
         private int threadId;
 
         private int threadId;
 
         private File[] files;
 
         private File[] files;
124行目: 124行目:
 
          
 
          
 
         /**
 
         /**
           * 対象ファイルから、正規表現に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する
+
           * 対象ファイルから、[[正規表現]]に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する
 
           * @param threadId スレッド識別用ID
 
           * @param threadId スレッド識別用ID
 
           * @param files grep 対象ファイル
 
           * @param files grep 対象ファイル
 
           * @param outDir 結果出力ディレクトリ
 
           * @param outDir 結果出力ディレクトリ
           * @param expr 正規表現(Java)
+
           * @param expr [[正規表現]](Java)
 
           */
 
           */
 
         public GrepCommand(int threadId, File[] files, File outDir, String expr) {
 
         public GrepCommand(int threadId, File[] files, File outDir, String expr) {
140行目: 140行目:
 
         }
 
         }
 
          
 
          
         /* (non-Javadoc)
+
         /* (non-[[Java]]doc)
           * @see java.lang.Runnable#run()
+
           * @see java.lang.[[R]]unnable#run()
 
           */
 
           */
 
         public void run() {
 
         public void run() {
153行目: 153行目:
 
                 for (File file : files) {
 
                 for (File file : files) {
 
                     System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName());
 
                     System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName());
                     BufferedReader reader = new BufferedReader(
+
                     Buffered[[R]]eader reader = new Buffered[[R]]eader(
                             new InputStreamReader(new FileInputStream(file)));
+
                             new InputStream[[R]]eader(new FileInputStream(file)));
 
                      
 
                      
 
                     long lineno = 0;
 
                     long lineno = 0;
192行目: 192行目:
 
=====実行例11=====
 
=====実行例11=====
  
  > java JGrep "C:\\work\\" ".*TEST.*" ".*\.txt$"
+
  > java JGrep "C:\\work\\" ".*[[TEST]].*" ".*\.txt$"
 
   
 
   
 
  [006]処理中 ... WORK16.txt
 
  [006]処理中 ... WORK16.txt
233行目: 233行目:
 
===内容===
 
===内容===
 
*上記例を書き換え
 
*上記例を書き換え
*ExecutorService を使って複数スレッドを管理
+
*ExecutorSer[[vi]]ce を使って複数スレッドを管理
*Runnable の代わりに、Callableを利用して、結果を受け取る
+
*[[R]]unnable の代わりに、Callableを利用して、結果を受け取る
 
*Future にて結果を表示
 
*Future にて結果を表示
 
====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/ExecutorService.html ExecutorService]====
 
====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/ExecutorService.html ExecutorService]====
243行目: 243行目:
 
====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/Callable.html Callable]====
 
====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/Callable.html Callable]====
 
*結果を返し、例外をスローすることがあるタスク
 
*結果を返し、例外をスローすることがあるタスク
*Runnable と似ていて、どちらもインスタンスが別のスレッドによって実行される可能性があるクラス用に設計
+
*[[R]]unnable と似ていて、どちらもインスタンスが別のスレッドによって実行される可能性があるクラス用に設計
*Runnable は結果を返さず、チェック例外をスローすることはできません
+
*[[R]]unnable は結果を返さず、チェック例外をスローすることはできません
  
  
 
====ソース====
 
====ソース====
  
  import java.io.BufferedReader;
+
  import java.io.Buffered[[R]]eader;
 
  import java.io.BufferedWriter;
 
  import java.io.BufferedWriter;
 
  import java.io.File;
 
  import java.io.File;
255行目: 255行目:
 
  import java.io.FileOutputStream;
 
  import java.io.FileOutputStream;
 
  import java.io.FilenameFilter;
 
  import java.io.FilenameFilter;
  import java.io.InputStreamReader;
+
  import java.io.InputStream[[R]]eader;
 
  import java.io.OutputStreamWriter;
 
  import java.io.OutputStreamWriter;
 
  import java.util.ArrayList;
 
  import java.util.ArrayList;
 
  import java.util.List;
 
  import java.util.List;
 
  import java.util.concurrent.Callable;
 
  import java.util.concurrent.Callable;
  import java.util.concurrent.ExecutorService;
+
  import java.util.concurrent.ExecutorSer[[vi]]ce;
 
  import java.util.concurrent.Executors;
 
  import java.util.concurrent.Executors;
 
  import java.util.concurrent.Future;
 
  import java.util.concurrent.Future;
269行目: 269行目:
 
  public class JGrep {
 
  public class JGrep {
 
   
 
   
     private static final int THREAD_SIZE = 10;
+
     private static final int TH[[R]]EAD_SIZE = 10;
 
     // スレッドプール
 
     // スレッドプール
     private ExecutorService threadPool;
+
     private ExecutorSer[[vi]]ce threadPool;
 
      
 
      
 
     public static void main(String[] args) throws Exception {
 
     public static void main(String[] args) throws Exception {
314行目: 314行目:
 
          
 
          
 
         // スレッドで処理させる単位(バケツ)のサイズを決定
 
         // スレッドで処理させる単位(バケツ)のサイズを決定
         int buketSize = files.length / THREAD_SIZE;
+
         int buketSize = files.length / TH[[R]]EAD_SIZE;
 
         buketSize++;
 
         buketSize++;
 
          
 
          
337行目: 337行目:
 
         }
 
         }
 
          
 
          
         // Callable は、Runnabla と同じような役割を担うが、
+
         // Callable は、[[R]]unnabla と同じような役割を担うが、
         // Runnable と異なり、結果を返し、例外をスローすることができる
+
         // [[R]]unnable と異なり、結果を返し、例外をスローすることができる
 
         List<Callable<Long>> tasks = new ArrayList<Callable<Long>>();
 
         List<Callable<Long>> tasks = new ArrayList<Callable<Long>>();
         threadPool = Executors.newFixedThreadPool(THREAD_SIZE);
+
         threadPool = Executors.newFixedThreadPool(TH[[R]]EAD_SIZE);
 
   
 
   
 
         int threadId = 1;
 
         int threadId = 1;
370行目: 370行目:
 
          
 
          
 
         /**
 
         /**
           * 対象ファイルから、正規表現に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する
+
           * 対象ファイルから、[[正規表現]]に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する
 
           * @param threadId スレッド識別用ID
 
           * @param threadId スレッド識別用ID
 
           * @param files grep 対象ファイル
 
           * @param files grep 対象ファイル
 
           * @param outDir 結果出力ディレクトリ
 
           * @param outDir 結果出力ディレクトリ
           * @param expr 正規表現(Java)
+
           * @param expr [[正規表現]](Java)
 
           */
 
           */
 
         public GrepCommand(int threadId, File[] files, File outDir, String expr) {
 
         public GrepCommand(int threadId, File[] files, File outDir, String expr) {
386行目: 386行目:
 
         }
 
         }
 
   
 
   
         /* (non-Javadoc)
+
         /* (non-[[Java]]doc)
 
           * @see java.util.concurrent.Callable#call()
 
           * @see java.util.concurrent.Callable#call()
 
           */
 
           */
400行目: 400行目:
 
             for (File file : files) {
 
             for (File file : files) {
 
                 System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName());
 
                 System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName());
                 BufferedReader reader = new BufferedReader(
+
                 Buffered[[R]]eader reader = new Buffered[[R]]eader(
                         new InputStreamReader(new FileInputStream(file)));
+
                         new InputStream[[R]]eader(new FileInputStream(file)));
 
                  
 
                  
 
                 long lineno = 0;
 
                 long lineno = 0;

2020年2月16日 (日) 04:27時点における最新版

Java 並行処理

Java |

非同期に grep を実施する例1

内容

  • Thread ではなく、Executor を使って、Runnable を非同期に実行する
  • 共有データにスレッドセーフな単一の変数(java.util.concurrent.atomic)を使用する

java.util.concurrent.atomic

  • 単一の変数に対するロックフリーでスレッドセーフなプログラミングをサポートするクラスの小規模なツールキット

Executor

  • java.util.concurrent
  • 送信された Runnable タスクを実行するオブジェクト
  • 通常、executor は、明示的にスレッドを作成する代わりに使用

ソース

  1. import java.io.BufferedReader;
  2. import java.io.BufferedWriter;
  3. import java.io.File;
  4. import java.io.FileInputStream;
  5. import java.io.FileOutputStream;
  6. import java.io.FilenameFilter;
  7. import java.io.InputStreamReader;
  8. import java.io.OutputStreamWriter;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. import java.util.concurrent.Executor;
  12. import java.util.concurrent.atomic.AtomicInteger;
  13.  
  14. /**
  15. *
  16. */
  17. public class JGrep {
  18.  
  19. // 同期された変数
  20. //volatile int workingThreadCount;
  21. static AtomicInteger workingThreadCount = null;
  22. private static final int THREAD_SIZE = 10;
  23. public static void main(String[] args) throws Exception {
  24. if (args.length <=2) {
  25. System.out.println("Usage: java JGrep 対象ディレクトリ 正規表現 [ファイルフィルタ]");
  26. System.exit(-1);
  27. }
  28.  
  29. String fileFilter = "";
  30. if (args.length > 2) {
  31. fileFilter = args[2];
  32. }
  33. (new JGrep()).grep(args[0], args[1], fileFilter);
  34. }
  35. public void grep(String dirName, String expr, String filter) throws Exception {
  36. // 対象ファイルを取得
  37. File dir = new File(dirName);
  38. final String fltr = filter;
  39. File[] files = dir.listFiles(
  40. new FilenameFilter(){
  41. public boolean accept(File dir, String name) {
  42. return name.matches(fltr);
  43. }
  44. });
  45. // 出力先ディレクトリ
  46. File outDir = new File(dir.getAbsolutePath() + File.separator + "out");
  47. if (!outDir.exists()) {
  48. if (!outDir.mkdirs()) {
  49. throw new IllegalStateException("出力ディレクトリが作成できません");
  50. }
  51. }
  52. // スレッドで処理させる単位(バケツ)のサイズを決定
  53. int buketSize = files.length / THREAD_SIZE;
  54. buketSize++;
  55. int filesInBuketCnt = 0;
  56. List<List<File>> buketBuket = new ArrayList<List<File>>();
  57. List<File> fileBuket = null;
  58. for (File file : files) {
  59. if (filesInBuketCnt == 0) {
  60. fileBuket = new ArrayList<File>();
  61. }
  62. fileBuket.add(file);
  63. filesInBuketCnt++;
  64. if (filesInBuketCnt > buketSize) {
  65. buketBuket.add(fileBuket);
  66. filesInBuketCnt = 0;
  67. fileBuket = null;
  68. }
  69. }
  70. if (files !=null && filesInBuketCnt > 0) {
  71. buketBuket.add(fileBuket);
  72. }
  73. // スレッド数を登録
  74. workingThreadCount = new AtomicInteger(buketBuket.size());
  75.  
  76. // 非同期処理を行う
  77. Executor executer = new Executor() {
  78. public void execute(Runnable command) {
  79. new Thread(command).start();
  80. }
  81. };
  82. int threadId = 1;
  83. for (List<File> lstFiles : buketBuket) {
  84. executer.execute(
  85. new GrepCommand(threadId++,
  86. (File[])lstFiles.toArray(new File[0]),
  87. outDir,
  88. expr));
  89. }
  90. }
  91. /**
  92. * grep を実行する
  93. */
  94. public static class GrepCommand implements Runnable {
  95. private int threadId;
  96. private File[] files;
  97. private File outDir;
  98. private String expr;
  99. /**
  100. * 対象ファイルから、正規表現に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する
  101. * @param threadId スレッド識別用ID
  102. * @param files grep 対象ファイル
  103. * @param outDir 結果出力ディレクトリ
  104. * @param expr 正規表現(Java)
  105. */
  106. public GrepCommand(int threadId, File[] files, File outDir, String expr) {
  107. if (outDir == null || !outDir.isDirectory()) {
  108. throw new IllegalArgumentException("不正な出力先");
  109. }
  110. this.threadId = threadId;
  111. this.files = files;
  112. this.outDir = outDir;
  113. this.expr = expr;
  114. }
  115. /* (non-Javadoc)
  116. * @see java.lang.Runnable#run()
  117. */
  118. public void run() {
  119. try {
  120. File outFile = new File(this.outDir.getAbsolutePath()
  121. + File.separator
  122. + String.format("%03d.txt", this.threadId));
  123. BufferedWriter writer = new BufferedWriter(
  124. new OutputStreamWriter(new FileOutputStream(outFile)));
  125. for (File file : files) {
  126. System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName());
  127. BufferedReader reader = new BufferedReader(
  128. new InputStreamReader(new FileInputStream(file)));
  129. long lineno = 0;
  130. boolean isFirstMatch = false;
  131. String line = null;
  132. while((line = reader.readLine()) != null) {
  133. lineno++;
  134. if (line.matches(this.expr)) {
  135. if (!isFirstMatch) {
  136. writer.write("\n--------------------------------\n");
  137. writer.write(file.getAbsolutePath());
  138. writer.write("\n--------------------------------\n");
  139. isFirstMatch = true;
  140. }
  141. writer.write(line + "\n");
  142. }
  143. }
  144. reader.close();
  145. }
  146. writer.close();
  147. } catch (Exception e) {
  148. System.out.println(e);
  149. System.exit(-1);
  150. } finally {
  151. // 動いているスレッドが無くなったら、終了メッセージを出力
  152. int remain = workingThreadCount.addAndGet(-1);
  153. System.out.println("残りのスレッド数... " + remain);
  154. if ( remain <= 0) {
  155. System.out.println("終了しました。");
  156. }
  157. }
  158. }
  159. }
  160. }
実行例11
  1. > java JGrep "C:\\work\\" ".*TEST.*" ".*\.txt$"
  2.  
  3. [006]処理中 ... WORK16.txt
  4. [005]処理中 ... WORK11.txt
  5. [007]処理中 ... WORK21.txt
  6. [001]処理中 ... TEST1028.txt
  7. [002]処理中 ... TEST2085.txt
  8. [002]処理中 ... TEST2089.txt
  9. [002]処理中 ... TEST2096.txt
  10. [001]処理中 ... TEST1135.txt
  11. [006]処理中 ... WORK17.txt
  12. [007]処理中 ... WORK23.txt
  13. [008]処理中 ... WORK25_M25.txt
  14. 残りのスレッド数... 7
  15. [006]処理中 ... WORK17.h.txt
  16. [001]処理中 ... TEST2076.txt
  17. [007]処理中 ... WORK24.txt
  18. [006]処理中 ... WORK19.txt
  19. [002]処理中 ... TEST2097Z.txt
  20. 残りのスレッド数... 6
  21. [007]処理中 ... WORK25#.txt
  22. 残りのスレッド数... 5
  23. [005]処理中 ... WORK11_M24.txt
  24. 残りのスレッド数... 4
  25. 残りのスレッド数... 3
  26. [008]処理中 ... WORK27.txt
  27. [005]処理中 ... WORK11_NEW.txt
  28. [008]処理中 ... コピー TEST2085.txt
  29. [005]処理中 ... WORK11_W.txt
  30. [007]処理中 ... WORK25.20090630.txt
  31. 残りのスレッド数... 2
  32. [007]処理中 ... WORK25.txt
  33. [005]処理中 ... WORK11_WW.txt
  34. [005]処理中 ... WORK12.txt
  35. 残りのスレッド数... 1
  36. 残りのスレッド数... 0
  37. 終了しました。

非同期に grep を実施する例2

内容

  • 上記例を書き換え
  • ExecutorService を使って複数スレッドを管理
  • Runnable の代わりに、Callableを利用して、結果を受け取る
  • Future にて結果を表示

ExecutorService

  • 終了を管理するメソッド、および 1 つ以上の非同期タスクの進行状況を追跡する Future を生成できるメソッドを提供

Future

  • 非同期計算の結果を表す
  • 計算が完了したかどうかのチェック、完了までの待機、計算結果の取得などを行うためのメソッドを用意

Callable

  • 結果を返し、例外をスローすることがあるタスク
  • Runnable と似ていて、どちらもインスタンスが別のスレッドによって実行される可能性があるクラス用に設計
  • Runnable は結果を返さず、チェック例外をスローすることはできません


ソース

  1. import java.io.BufferedReader;
  2. import java.io.BufferedWriter;
  3. import java.io.File;
  4. import java.io.FileInputStream;
  5. import java.io.FileOutputStream;
  6. import java.io.FilenameFilter;
  7. import java.io.InputStreamReader;
  8. import java.io.OutputStreamWriter;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. import java.util.concurrent.Callable;
  12. import java.util.concurrent.ExecutorService;
  13. import java.util.concurrent.Executors;
  14. import java.util.concurrent.Future;
  15.  
  16. /**
  17. *
  18. */
  19. public class JGrep {
  20.  
  21. private static final int THREAD_SIZE = 10;
  22. // スレッドプール
  23. private ExecutorService threadPool;
  24. public static void main(String[] args) throws Exception {
  25. if (args.length <=2) {
  26. System.out.println("Usage: java JGrep 対象ディレクトリ 正規表現 [ファイルフィルタ]");
  27. System.exit(-1);
  28. }
  29.  
  30. String fileFilter = "";
  31. if (args.length > 2) {
  32. fileFilter = args[2];
  33. }
  34. (new JGrep()).grep(args[0], args[1], fileFilter);
  35. }
  36. /**
  37. * @param dirName
  38. * @param expr
  39. * @param filter
  40. * @throws Exception
  41. */
  42. public void grep(String dirName, String expr, String filter) throws Exception {
  43. // 対象ファイルを取得
  44. File dir = new File(dirName);
  45. final String fltr = filter;
  46. File[] files = dir.listFiles(
  47. new FilenameFilter(){
  48. public boolean accept(File dir, String name) {
  49. return name.matches(fltr);
  50. }
  51. });
  52. // 出力先ディレクトリ
  53. File outDir = new File(dir.getAbsolutePath() + File.separator + "out");
  54. if (!outDir.exists()) {
  55. if (!outDir.mkdirs()) {
  56. throw new IllegalStateException("出力ディレクトリが作成できません");
  57. }
  58. }
  59. // スレッドで処理させる単位(バケツ)のサイズを決定
  60. int buketSize = files.length / THREAD_SIZE;
  61. buketSize++;
  62. int filesInBuketCnt = 0;
  63. List<List<File>> buketBuket = new ArrayList<List<File>>();
  64. List<File> fileBuket = null;
  65. for (File file : files) {
  66. if (filesInBuketCnt == 0) {
  67. fileBuket = new ArrayList<File>();
  68. }
  69. fileBuket.add(file);
  70. filesInBuketCnt++;
  71. if (filesInBuketCnt > buketSize) {
  72. buketBuket.add(fileBuket);
  73. filesInBuketCnt = 0;
  74. fileBuket = null;
  75. }
  76. }
  77. if (files !=null && filesInBuketCnt > 0) {
  78. buketBuket.add(fileBuket);
  79. }
  80. // Callable は、Runnabla と同じような役割を担うが、
  81. // Runnable と異なり、結果を返し、例外をスローすることができる
  82. List<Callable<Long>> tasks = new ArrayList<Callable<Long>>();
  83. threadPool = Executors.newFixedThreadPool(THREAD_SIZE);
  84.  
  85. int threadId = 1;
  86. for (List<File> lstFiles : buketBuket) {
  87. tasks.add(
  88. new GrepCommand(threadId++,
  89. (File[])lstFiles.toArray(new File[0]),
  90. outDir,
  91. expr));
  92. }
  93. // 指定されたタスクを実行し、すべて完了すると、ステータスと結果を含む Future のリストを返す
  94. List<Future<Long>> results = threadPool.invokeAll(tasks);
  95. for (Future<Long>: results) {
  96. System.out.printf("一致件数 %d 件\n", f.get().longValue());
  97. }
  98. System.out.println("終了しました。");
  99. }
  100. /**
  101. * grep を実行する
  102. */
  103. public static class GrepCommand implements Callable<Long> {
  104. private int threadId;
  105. private File[] files;
  106. private File outDir;
  107. private String expr;
  108. /**
  109. * 対象ファイルから、正規表現に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する
  110. * @param threadId スレッド識別用ID
  111. * @param files grep 対象ファイル
  112. * @param outDir 結果出力ディレクトリ
  113. * @param expr 正規表現(Java)
  114. */
  115. public GrepCommand(int threadId, File[] files, File outDir, String expr) {
  116. if (outDir == null || !outDir.isDirectory()) {
  117. throw new IllegalArgumentException("不正な出力先");
  118. }
  119. this.threadId = threadId;
  120. this.files = files;
  121. this.outDir = outDir;
  122. this.expr = expr;
  123. }
  124.  
  125. /* (non-Javadoc)
  126. * @see java.util.concurrent.Callable#call()
  127. */
  128. public Long call() throws Exception {
  129. File outFile = new File(this.outDir.getAbsolutePath()
  130. + File.separator
  131. + String.format("%03d.txt", this.threadId));
  132. BufferedWriter writer = new BufferedWriter(
  133. new OutputStreamWriter(new FileOutputStream(outFile)));
  134. long match = 0L;
  135. for (File file : files) {
  136. System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName());
  137. BufferedReader reader = new BufferedReader(
  138. new InputStreamReader(new FileInputStream(file)));
  139. long lineno = 0;
  140. boolean isFirstMatch = false;
  141. String line = null;
  142. while((line = reader.readLine()) != null) {
  143. lineno++;
  144. if (line.matches(this.expr)) {
  145. if (!isFirstMatch) {
  146. writer.write("\n--------------------------------\n");
  147. writer.write(file.getAbsolutePath());
  148. writer.write("\n--------------------------------\n");
  149. isFirstMatch = true;
  150. }
  151. writer.write(line + "\n");
  152. match++;
  153. }
  154. }
  155. reader.close();
  156. }
  157. writer.close();
  158. // 結果を返す
  159. return new Long(match);
  160. }
  161. }
  162. }