| ページ一覧 | ブログ | twitter |  書式 | 書式(表) |

MyMemoWiki

Java 並行処理

提供: MyMemoWiki
2020年2月16日 (日) 04:27時点におけるPiroto (トーク | 投稿記録)による版
(差分) ← 古い版 | 最新版 (差分) | 新しい版 → (差分)
ナビゲーションに移動 検索に移動

Java 並行処理

Java |

非同期に grep を実施する例1

内容

  • Thread ではなく、Executor を使って、Runnable を非同期に実行する
  • 共有データにスレッドセーフな単一の変数(java.util.concurrent.atomic)を使用する

java.util.concurrent.atomic

  • 単一の変数に対するロックフリーでスレッドセーフなプログラミングをサポートするクラスの小規模なツールキット

Executor

  • java.util.concurrent
  • 送信された Runnable タスクを実行するオブジェクト
  • 通常、executor は、明示的にスレッドを作成する代わりに使用

ソース

  1. import java.io.BufferedReader;
  2. import java.io.BufferedWriter;
  3. import java.io.File;
  4. import java.io.FileInputStream;
  5. import java.io.FileOutputStream;
  6. import java.io.FilenameFilter;
  7. import java.io.InputStreamReader;
  8. import java.io.OutputStreamWriter;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. import java.util.concurrent.Executor;
  12. import java.util.concurrent.atomic.AtomicInteger;
  13.  
  14. /**
  15. *
  16. */
  17. public class JGrep {
  18.  
  19. // 同期された変数
  20. //volatile int workingThreadCount;
  21. static AtomicInteger workingThreadCount = null;
  22. private static final int THREAD_SIZE = 10;
  23. public static void main(String[] args) throws Exception {
  24. if (args.length <=2) {
  25. System.out.println("Usage: java JGrep 対象ディレクトリ 正規表現 [ファイルフィルタ]");
  26. System.exit(-1);
  27. }
  28.  
  29. String fileFilter = "";
  30. if (args.length > 2) {
  31. fileFilter = args[2];
  32. }
  33. (new JGrep()).grep(args[0], args[1], fileFilter);
  34. }
  35. public void grep(String dirName, String expr, String filter) throws Exception {
  36. // 対象ファイルを取得
  37. File dir = new File(dirName);
  38. final String fltr = filter;
  39. File[] files = dir.listFiles(
  40. new FilenameFilter(){
  41. public boolean accept(File dir, String name) {
  42. return name.matches(fltr);
  43. }
  44. });
  45. // 出力先ディレクトリ
  46. File outDir = new File(dir.getAbsolutePath() + File.separator + "out");
  47. if (!outDir.exists()) {
  48. if (!outDir.mkdirs()) {
  49. throw new IllegalStateException("出力ディレクトリが作成できません");
  50. }
  51. }
  52. // スレッドで処理させる単位(バケツ)のサイズを決定
  53. int buketSize = files.length / THREAD_SIZE;
  54. buketSize++;
  55. int filesInBuketCnt = 0;
  56. List<List<File>> buketBuket = new ArrayList<List<File>>();
  57. List<File> fileBuket = null;
  58. for (File file : files) {
  59. if (filesInBuketCnt == 0) {
  60. fileBuket = new ArrayList<File>();
  61. }
  62. fileBuket.add(file);
  63. filesInBuketCnt++;
  64. if (filesInBuketCnt > buketSize) {
  65. buketBuket.add(fileBuket);
  66. filesInBuketCnt = 0;
  67. fileBuket = null;
  68. }
  69. }
  70. if (files !=null && filesInBuketCnt > 0) {
  71. buketBuket.add(fileBuket);
  72. }
  73. // スレッド数を登録
  74. workingThreadCount = new AtomicInteger(buketBuket.size());
  75.  
  76. // 非同期処理を行う
  77. Executor executer = new Executor() {
  78. public void execute(Runnable command) {
  79. new Thread(command).start();
  80. }
  81. };
  82. int threadId = 1;
  83. for (List<File> lstFiles : buketBuket) {
  84. executer.execute(
  85. new GrepCommand(threadId++,
  86. (File[])lstFiles.toArray(new File[0]),
  87. outDir,
  88. expr));
  89. }
  90. }
  91. /**
  92. * grep を実行する
  93. */
  94. public static class GrepCommand implements Runnable {
  95. private int threadId;
  96. private File[] files;
  97. private File outDir;
  98. private String expr;
  99. /**
  100. * 対象ファイルから、正規表現に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する
  101. * @param threadId スレッド識別用ID
  102. * @param files grep 対象ファイル
  103. * @param outDir 結果出力ディレクトリ
  104. * @param expr 正規表現(Java)
  105. */
  106. public GrepCommand(int threadId, File[] files, File outDir, String expr) {
  107. if (outDir == null || !outDir.isDirectory()) {
  108. throw new IllegalArgumentException("不正な出力先");
  109. }
  110. this.threadId = threadId;
  111. this.files = files;
  112. this.outDir = outDir;
  113. this.expr = expr;
  114. }
  115. /* (non-Javadoc)
  116. * @see java.lang.Runnable#run()
  117. */
  118. public void run() {
  119. try {
  120. File outFile = new File(this.outDir.getAbsolutePath()
  121. + File.separator
  122. + String.format("%03d.txt", this.threadId));
  123. BufferedWriter writer = new BufferedWriter(
  124. new OutputStreamWriter(new FileOutputStream(outFile)));
  125. for (File file : files) {
  126. System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName());
  127. BufferedReader reader = new BufferedReader(
  128. new InputStreamReader(new FileInputStream(file)));
  129. long lineno = 0;
  130. boolean isFirstMatch = false;
  131. String line = null;
  132. while((line = reader.readLine()) != null) {
  133. lineno++;
  134. if (line.matches(this.expr)) {
  135. if (!isFirstMatch) {
  136. writer.write("\n--------------------------------\n");
  137. writer.write(file.getAbsolutePath());
  138. writer.write("\n--------------------------------\n");
  139. isFirstMatch = true;
  140. }
  141. writer.write(line + "\n");
  142. }
  143. }
  144. reader.close();
  145. }
  146. writer.close();
  147. } catch (Exception e) {
  148. System.out.println(e);
  149. System.exit(-1);
  150. } finally {
  151. // 動いているスレッドが無くなったら、終了メッセージを出力
  152. int remain = workingThreadCount.addAndGet(-1);
  153. System.out.println("残りのスレッド数... " + remain);
  154. if ( remain <= 0) {
  155. System.out.println("終了しました。");
  156. }
  157. }
  158. }
  159. }
  160. }
実行例11
  1. > java JGrep "C:\\work\\" ".*TEST.*" ".*\.txt$"
  2.  
  3. [006]処理中 ... WORK16.txt
  4. [005]処理中 ... WORK11.txt
  5. [007]処理中 ... WORK21.txt
  6. [001]処理中 ... TEST1028.txt
  7. [002]処理中 ... TEST2085.txt
  8. [002]処理中 ... TEST2089.txt
  9. [002]処理中 ... TEST2096.txt
  10. [001]処理中 ... TEST1135.txt
  11. [006]処理中 ... WORK17.txt
  12. [007]処理中 ... WORK23.txt
  13. [008]処理中 ... WORK25_M25.txt
  14. 残りのスレッド数... 7
  15. [006]処理中 ... WORK17.h.txt
  16. [001]処理中 ... TEST2076.txt
  17. [007]処理中 ... WORK24.txt
  18. [006]処理中 ... WORK19.txt
  19. [002]処理中 ... TEST2097Z.txt
  20. 残りのスレッド数... 6
  21. [007]処理中 ... WORK25#.txt
  22. 残りのスレッド数... 5
  23. [005]処理中 ... WORK11_M24.txt
  24. 残りのスレッド数... 4
  25. 残りのスレッド数... 3
  26. [008]処理中 ... WORK27.txt
  27. [005]処理中 ... WORK11_NEW.txt
  28. [008]処理中 ... コピー TEST2085.txt
  29. [005]処理中 ... WORK11_W.txt
  30. [007]処理中 ... WORK25.20090630.txt
  31. 残りのスレッド数... 2
  32. [007]処理中 ... WORK25.txt
  33. [005]処理中 ... WORK11_WW.txt
  34. [005]処理中 ... WORK12.txt
  35. 残りのスレッド数... 1
  36. 残りのスレッド数... 0
  37. 終了しました。

非同期に grep を実施する例2

内容

  • 上記例を書き換え
  • ExecutorService を使って複数スレッドを管理
  • Runnable の代わりに、Callableを利用して、結果を受け取る
  • Future にて結果を表示

ExecutorService

  • 終了を管理するメソッド、および 1 つ以上の非同期タスクの進行状況を追跡する Future を生成できるメソッドを提供

Future

  • 非同期計算の結果を表す
  • 計算が完了したかどうかのチェック、完了までの待機、計算結果の取得などを行うためのメソッドを用意

Callable

  • 結果を返し、例外をスローすることがあるタスク
  • Runnable と似ていて、どちらもインスタンスが別のスレッドによって実行される可能性があるクラス用に設計
  • Runnable は結果を返さず、チェック例外をスローすることはできません


ソース

  1. import java.io.BufferedReader;
  2. import java.io.BufferedWriter;
  3. import java.io.File;
  4. import java.io.FileInputStream;
  5. import java.io.FileOutputStream;
  6. import java.io.FilenameFilter;
  7. import java.io.InputStreamReader;
  8. import java.io.OutputStreamWriter;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. import java.util.concurrent.Callable;
  12. import java.util.concurrent.ExecutorService;
  13. import java.util.concurrent.Executors;
  14. import java.util.concurrent.Future;
  15.  
  16. /**
  17. *
  18. */
  19. public class JGrep {
  20.  
  21. private static final int THREAD_SIZE = 10;
  22. // スレッドプール
  23. private ExecutorService threadPool;
  24. public static void main(String[] args) throws Exception {
  25. if (args.length <=2) {
  26. System.out.println("Usage: java JGrep 対象ディレクトリ 正規表現 [ファイルフィルタ]");
  27. System.exit(-1);
  28. }
  29.  
  30. String fileFilter = "";
  31. if (args.length > 2) {
  32. fileFilter = args[2];
  33. }
  34. (new JGrep()).grep(args[0], args[1], fileFilter);
  35. }
  36. /**
  37. * @param dirName
  38. * @param expr
  39. * @param filter
  40. * @throws Exception
  41. */
  42. public void grep(String dirName, String expr, String filter) throws Exception {
  43. // 対象ファイルを取得
  44. File dir = new File(dirName);
  45. final String fltr = filter;
  46. File[] files = dir.listFiles(
  47. new FilenameFilter(){
  48. public boolean accept(File dir, String name) {
  49. return name.matches(fltr);
  50. }
  51. });
  52. // 出力先ディレクトリ
  53. File outDir = new File(dir.getAbsolutePath() + File.separator + "out");
  54. if (!outDir.exists()) {
  55. if (!outDir.mkdirs()) {
  56. throw new IllegalStateException("出力ディレクトリが作成できません");
  57. }
  58. }
  59. // スレッドで処理させる単位(バケツ)のサイズを決定
  60. int buketSize = files.length / THREAD_SIZE;
  61. buketSize++;
  62. int filesInBuketCnt = 0;
  63. List<List<File>> buketBuket = new ArrayList<List<File>>();
  64. List<File> fileBuket = null;
  65. for (File file : files) {
  66. if (filesInBuketCnt == 0) {
  67. fileBuket = new ArrayList<File>();
  68. }
  69. fileBuket.add(file);
  70. filesInBuketCnt++;
  71. if (filesInBuketCnt > buketSize) {
  72. buketBuket.add(fileBuket);
  73. filesInBuketCnt = 0;
  74. fileBuket = null;
  75. }
  76. }
  77. if (files !=null && filesInBuketCnt > 0) {
  78. buketBuket.add(fileBuket);
  79. }
  80. // Callable は、Runnabla と同じような役割を担うが、
  81. // Runnable と異なり、結果を返し、例外をスローすることができる
  82. List<Callable<Long>> tasks = new ArrayList<Callable<Long>>();
  83. threadPool = Executors.newFixedThreadPool(THREAD_SIZE);
  84.  
  85. int threadId = 1;
  86. for (List<File> lstFiles : buketBuket) {
  87. tasks.add(
  88. new GrepCommand(threadId++,
  89. (File[])lstFiles.toArray(new File[0]),
  90. outDir,
  91. expr));
  92. }
  93. // 指定されたタスクを実行し、すべて完了すると、ステータスと結果を含む Future のリストを返す
  94. List<Future<Long>> results = threadPool.invokeAll(tasks);
  95. for (Future<Long>: results) {
  96. System.out.printf("一致件数 %d 件\n", f.get().longValue());
  97. }
  98. System.out.println("終了しました。");
  99. }
  100. /**
  101. * grep を実行する
  102. */
  103. public static class GrepCommand implements Callable<Long> {
  104. private int threadId;
  105. private File[] files;
  106. private File outDir;
  107. private String expr;
  108. /**
  109. * 対象ファイルから、正規表現に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する
  110. * @param threadId スレッド識別用ID
  111. * @param files grep 対象ファイル
  112. * @param outDir 結果出力ディレクトリ
  113. * @param expr 正規表現(Java)
  114. */
  115. public GrepCommand(int threadId, File[] files, File outDir, String expr) {
  116. if (outDir == null || !outDir.isDirectory()) {
  117. throw new IllegalArgumentException("不正な出力先");
  118. }
  119. this.threadId = threadId;
  120. this.files = files;
  121. this.outDir = outDir;
  122. this.expr = expr;
  123. }
  124.  
  125. /* (non-Javadoc)
  126. * @see java.util.concurrent.Callable#call()
  127. */
  128. public Long call() throws Exception {
  129. File outFile = new File(this.outDir.getAbsolutePath()
  130. + File.separator
  131. + String.format("%03d.txt", this.threadId));
  132. BufferedWriter writer = new BufferedWriter(
  133. new OutputStreamWriter(new FileOutputStream(outFile)));
  134. long match = 0L;
  135. for (File file : files) {
  136. System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName());
  137. BufferedReader reader = new BufferedReader(
  138. new InputStreamReader(new FileInputStream(file)));
  139. long lineno = 0;
  140. boolean isFirstMatch = false;
  141. String line = null;
  142. while((line = reader.readLine()) != null) {
  143. lineno++;
  144. if (line.matches(this.expr)) {
  145. if (!isFirstMatch) {
  146. writer.write("\n--------------------------------\n");
  147. writer.write(file.getAbsolutePath());
  148. writer.write("\n--------------------------------\n");
  149. isFirstMatch = true;
  150. }
  151. writer.write(line + "\n");
  152. match++;
  153. }
  154. }
  155. reader.close();
  156. }
  157. writer.close();
  158. // 結果を返す
  159. return new Long(match);
  160. }
  161. }
  162. }