「Java 並行処理」の版間の差分
ナビゲーションに移動
検索に移動
| 1行目: | 1行目: | ||
| − | ==Java 並行処理== | + | ==[[Java 並行処理]]== |
| − | [[Java]] | | + | [[Java]] | [[Category:並行処理]] |
==非同期に grep を実施する例1== | ==非同期に grep を実施する例1== | ||
===内容=== | ===内容=== | ||
| − | *Thread ではなく、Executor | + | *Thread ではなく、Executor を使って、[[R]]unnable を非同期に実行する |
*共有データにスレッドセーフな単一の変数([http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent.atomic])を使用する | *共有データにスレッドセーフな単一の変数([http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent.atomic])を使用する | ||
====[http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent.atomic]==== | ====[http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent.atomic]==== | ||
| 10行目: | 10行目: | ||
====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/Executor.html Executor]==== | ====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/Executor.html Executor]==== | ||
*[http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent] | *[http://java.sun.com/javase/ja/6/docs/ja/api/ java.util.concurrent] | ||
| − | *送信された | + | *送信された [[R]]unnable タスクを実行するオブジェクト |
*通常、executor は、明示的にスレッドを作成する代わりに使用 | *通常、executor は、明示的にスレッドを作成する代わりに使用 | ||
====ソース==== | ====ソース==== | ||
| − | import java.io. | + | import java.io.Buffered[[R]]eader; |
import java.io.BufferedWriter; | import java.io.BufferedWriter; | ||
import java.io.File; | import java.io.File; | ||
| 19行目: | 19行目: | ||
import java.io.FileOutputStream; | import java.io.FileOutputStream; | ||
import java.io.FilenameFilter; | import java.io.FilenameFilter; | ||
| − | import java.io. | + | import java.io.InputStream[[R]]eader; |
import java.io.OutputStreamWriter; | import java.io.OutputStreamWriter; | ||
import java.util.ArrayList; | import java.util.ArrayList; | ||
| 35行目: | 35行目: | ||
static AtomicInteger workingThreadCount = null; | static AtomicInteger workingThreadCount = null; | ||
| − | private static final int | + | private static final int TH[[R]]EAD_SIZE = 10; |
public static void main(String[] args) throws Exception { | public static void main(String[] args) throws Exception { | ||
| 71行目: | 71行目: | ||
// スレッドで処理させる単位(バケツ)のサイズを決定 | // スレッドで処理させる単位(バケツ)のサイズを決定 | ||
| − | int buketSize = files.length / | + | int buketSize = files.length / TH[[R]]EAD_SIZE; |
buketSize++; | buketSize++; | ||
| 99行目: | 99行目: | ||
// 非同期処理を行う | // 非同期処理を行う | ||
Executor executer = new Executor() { | Executor executer = new Executor() { | ||
| − | public void execute( | + | public void execute([[R]]unnable command) { |
new Thread(command).start(); | new Thread(command).start(); | ||
} | } | ||
| 117行目: | 117行目: | ||
* grep を実行する | * grep を実行する | ||
*/ | */ | ||
| − | public static class GrepCommand implements | + | public static class GrepCommand implements [[R]]unnable { |
private int threadId; | private int threadId; | ||
private File[] files; | private File[] files; | ||
| 124行目: | 124行目: | ||
/** | /** | ||
| − | * | + | * 対象ファイルから、[[正規表現]]に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する |
* @param threadId スレッド識別用ID | * @param threadId スレッド識別用ID | ||
* @param files grep 対象ファイル | * @param files grep 対象ファイル | ||
* @param outDir 結果出力ディレクトリ | * @param outDir 結果出力ディレクトリ | ||
| − | * @param expr 正規表現(Java) | + | * @param expr [[正規表現]](Java) |
*/ | */ | ||
public GrepCommand(int threadId, File[] files, File outDir, String expr) { | public GrepCommand(int threadId, File[] files, File outDir, String expr) { | ||
| 140行目: | 140行目: | ||
} | } | ||
| − | /* (non- | + | /* (non-[[Java]]doc) |
| − | * @see java.lang. | + | * @see java.lang.[[R]]unnable#run() |
*/ | */ | ||
public void run() { | public void run() { | ||
| 153行目: | 153行目: | ||
for (File file : files) { | for (File file : files) { | ||
System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName()); | System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName()); | ||
| − | + | Buffered[[R]]eader reader = new Buffered[[R]]eader( | |
| − | new | + | new InputStream[[R]]eader(new FileInputStream(file))); |
long lineno = 0; | long lineno = 0; | ||
| 192行目: | 192行目: | ||
=====実行例11===== | =====実行例11===== | ||
| − | > java JGrep "C:\\work\\" ".*TEST.*" ".*\.txt$" | + | > java JGrep "C:\\work\\" ".*[[TEST]].*" ".*\.txt$" |
[006]処理中 ... WORK16.txt | [006]処理中 ... WORK16.txt | ||
| 233行目: | 233行目: | ||
===内容=== | ===内容=== | ||
*上記例を書き換え | *上記例を書き換え | ||
| − | * | + | *ExecutorSer[[vi]]ce を使って複数スレッドを管理 |
| − | * | + | *[[R]]unnable の代わりに、Callableを利用して、結果を受け取る |
*Future にて結果を表示 | *Future にて結果を表示 | ||
====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/ExecutorService.html ExecutorService]==== | ====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/ExecutorService.html ExecutorService]==== | ||
| 243行目: | 243行目: | ||
====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/Callable.html Callable]==== | ====[http://java.sun.com/javase/ja/6/docs/ja/api/java/util/concurrent/Callable.html Callable]==== | ||
*結果を返し、例外をスローすることがあるタスク | *結果を返し、例外をスローすることがあるタスク | ||
| − | * | + | *[[R]]unnable と似ていて、どちらもインスタンスが別のスレッドによって実行される可能性があるクラス用に設計 |
| − | * | + | *[[R]]unnable は結果を返さず、チェック例外をスローすることはできません |
====ソース==== | ====ソース==== | ||
| − | import java.io. | + | import java.io.Buffered[[R]]eader; |
import java.io.BufferedWriter; | import java.io.BufferedWriter; | ||
import java.io.File; | import java.io.File; | ||
| 255行目: | 255行目: | ||
import java.io.FileOutputStream; | import java.io.FileOutputStream; | ||
import java.io.FilenameFilter; | import java.io.FilenameFilter; | ||
| − | import java.io. | + | import java.io.InputStream[[R]]eader; |
import java.io.OutputStreamWriter; | import java.io.OutputStreamWriter; | ||
import java.util.ArrayList; | import java.util.ArrayList; | ||
import java.util.List; | import java.util.List; | ||
import java.util.concurrent.Callable; | import java.util.concurrent.Callable; | ||
| − | import java.util.concurrent. | + | import java.util.concurrent.ExecutorSer[[vi]]ce; |
import java.util.concurrent.Executors; | import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | import java.util.concurrent.Future; | ||
| 269行目: | 269行目: | ||
public class JGrep { | public class JGrep { | ||
| − | private static final int | + | private static final int TH[[R]]EAD_SIZE = 10; |
// スレッドプール | // スレッドプール | ||
| − | private | + | private ExecutorSer[[vi]]ce threadPool; |
public static void main(String[] args) throws Exception { | public static void main(String[] args) throws Exception { | ||
| 314行目: | 314行目: | ||
// スレッドで処理させる単位(バケツ)のサイズを決定 | // スレッドで処理させる単位(バケツ)のサイズを決定 | ||
| − | int buketSize = files.length / | + | int buketSize = files.length / TH[[R]]EAD_SIZE; |
buketSize++; | buketSize++; | ||
| 337行目: | 337行目: | ||
} | } | ||
| − | // Callable | + | // Callable は、[[R]]unnabla と同じような役割を担うが、 |
| − | // | + | // [[R]]unnable と異なり、結果を返し、例外をスローすることができる |
List<Callable<Long>> tasks = new ArrayList<Callable<Long>>(); | List<Callable<Long>> tasks = new ArrayList<Callable<Long>>(); | ||
| − | threadPool = Executors.newFixedThreadPool( | + | threadPool = Executors.newFixedThreadPool(TH[[R]]EAD_SIZE); |
int threadId = 1; | int threadId = 1; | ||
| 370行目: | 370行目: | ||
/** | /** | ||
| − | * | + | * 対象ファイルから、[[正規表現]]に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する |
* @param threadId スレッド識別用ID | * @param threadId スレッド識別用ID | ||
* @param files grep 対象ファイル | * @param files grep 対象ファイル | ||
* @param outDir 結果出力ディレクトリ | * @param outDir 結果出力ディレクトリ | ||
| − | * @param expr 正規表現(Java) | + | * @param expr [[正規表現]](Java) |
*/ | */ | ||
public GrepCommand(int threadId, File[] files, File outDir, String expr) { | public GrepCommand(int threadId, File[] files, File outDir, String expr) { | ||
| 386行目: | 386行目: | ||
} | } | ||
| − | /* (non- | + | /* (non-[[Java]]doc) |
* @see java.util.concurrent.Callable#call() | * @see java.util.concurrent.Callable#call() | ||
*/ | */ | ||
| 400行目: | 400行目: | ||
for (File file : files) { | for (File file : files) { | ||
System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName()); | System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName()); | ||
| − | + | Buffered[[R]]eader reader = new Buffered[[R]]eader( | |
| − | new | + | new InputStream[[R]]eader(new FileInputStream(file))); |
long lineno = 0; | long lineno = 0; | ||
2020年2月16日 (日) 04:27時点における最新版
目次
Java 並行処理
Java |
非同期に grep を実施する例1
内容
- Thread ではなく、Executor を使って、Runnable を非同期に実行する
- 共有データにスレッドセーフな単一の変数(java.util.concurrent.atomic)を使用する
java.util.concurrent.atomic
- 単一の変数に対するロックフリーでスレッドセーフなプログラミングをサポートするクラスの小規模なツールキット
Executor
- java.util.concurrent
- 送信された Runnable タスクを実行するオブジェクト
- 通常、executor は、明示的にスレッドを作成する代わりに使用
ソース
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; /** * */ public class JGrep { // 同期された変数 //volatile int workingThreadCount; static AtomicInteger workingThreadCount = null; private static final int THREAD_SIZE = 10; public static void main(String[] args) throws Exception { if (args.length <=2) { System.out.println("Usage: java JGrep 対象ディレクトリ 正規表現 [ファイルフィルタ]"); System.exit(-1); } String fileFilter = ""; if (args.length > 2) { fileFilter = args[2]; } (new JGrep()).grep(args[0], args[1], fileFilter); } public void grep(String dirName, String expr, String filter) throws Exception { // 対象ファイルを取得 File dir = new File(dirName); final String fltr = filter; File[] files = dir.listFiles( new FilenameFilter(){ public boolean accept(File dir, String name) { return name.matches(fltr); } }); // 出力先ディレクトリ File outDir = new File(dir.getAbsolutePath() + File.separator + "out"); if (!outDir.exists()) { if (!outDir.mkdirs()) { throw new IllegalStateException("出力ディレクトリが作成できません"); } } // スレッドで処理させる単位(バケツ)のサイズを決定 int buketSize = files.length / THREAD_SIZE; buketSize++; int filesInBuketCnt = 0; List<List<File>> buketBuket = new ArrayList<List<File>>(); List<File> fileBuket = null; for (File file : files) { if (filesInBuketCnt == 0) { fileBuket = new ArrayList<File>(); } fileBuket.add(file); filesInBuketCnt++; if (filesInBuketCnt > buketSize) { buketBuket.add(fileBuket); filesInBuketCnt = 0; fileBuket = null; } } if (files !=null && filesInBuketCnt > 0) { buketBuket.add(fileBuket); } // スレッド数を登録 workingThreadCount = new AtomicInteger(buketBuket.size()); // 非同期処理を行う Executor executer = new Executor() { public void execute(Runnable command) { new Thread(command).start(); } }; int threadId = 1; for (List<File> lstFiles : buketBuket) { executer.execute( new GrepCommand(threadId++, (File[])lstFiles.toArray(new File[0]), outDir, expr)); } } /** * grep を実行する */ public static class GrepCommand implements Runnable { private int threadId; private File[] files; private File outDir; private String expr; /** * 対象ファイルから、正規表現に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する * @param threadId スレッド識別用ID * @param files grep 対象ファイル * @param outDir 結果出力ディレクトリ * @param expr 正規表現(Java) */ public GrepCommand(int threadId, File[] files, File outDir, String expr) { if (outDir == null || !outDir.isDirectory()) { throw new IllegalArgumentException("不正な出力先"); } this.threadId = threadId; this.files = files; this.outDir = outDir; this.expr = expr; } /* (non-Javadoc) * @see java.lang.Runnable#run() */ public void run() { try { File outFile = new File(this.outDir.getAbsolutePath() + File.separator + String.format("%03d.txt", this.threadId)); BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(new FileOutputStream(outFile))); for (File file : files) { System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName()); BufferedReader reader = new BufferedReader( new InputStreamReader(new FileInputStream(file))); long lineno = 0; boolean isFirstMatch = false; String line = null; while((line = reader.readLine()) != null) { lineno++; if (line.matches(this.expr)) { if (!isFirstMatch) { writer.write("\n--------------------------------\n"); writer.write(file.getAbsolutePath()); writer.write("\n--------------------------------\n"); isFirstMatch = true; } writer.write(line + "\n"); } } reader.close(); } writer.close(); } catch (Exception e) { System.out.println(e); System.exit(-1); } finally { // 動いているスレッドが無くなったら、終了メッセージを出力 int remain = workingThreadCount.addAndGet(-1); System.out.println("残りのスレッド数... " + remain); if ( remain <= 0) { System.out.println("終了しました。"); } } } } }
実行例11
> java JGrep "C:\\work\\" ".*TEST.*" ".*\.txt$" [006]処理中 ... WORK16.txt [005]処理中 ... WORK11.txt [007]処理中 ... WORK21.txt [001]処理中 ... TEST1028.txt [002]処理中 ... TEST2085.txt [002]処理中 ... TEST2089.txt [002]処理中 ... TEST2096.txt [001]処理中 ... TEST1135.txt [006]処理中 ... WORK17.txt [007]処理中 ... WORK23.txt [008]処理中 ... WORK25_M25.txt 残りのスレッド数... 7 [006]処理中 ... WORK17.h.txt [001]処理中 ... TEST2076.txt [007]処理中 ... WORK24.txt [006]処理中 ... WORK19.txt [002]処理中 ... TEST2097Z.txt 残りのスレッド数... 6 [007]処理中 ... WORK25#.txt 残りのスレッド数... 5 [005]処理中 ... WORK11_M24.txt 残りのスレッド数... 4 残りのスレッド数... 3 [008]処理中 ... WORK27.txt [005]処理中 ... WORK11_NEW.txt [008]処理中 ... コピー ~ TEST2085.txt [005]処理中 ... WORK11_W.txt [007]処理中 ... WORK25.20090630.txt 残りのスレッド数... 2 [007]処理中 ... WORK25.txt [005]処理中 ... WORK11_WW.txt [005]処理中 ... WORK12.txt 残りのスレッド数... 1 残りのスレッド数... 0 終了しました。
非同期に grep を実施する例2
内容
ExecutorService
- 終了を管理するメソッド、および 1 つ以上の非同期タスクの進行状況を追跡する Future を生成できるメソッドを提供
Future
- 非同期計算の結果を表す
- 計算が完了したかどうかのチェック、完了までの待機、計算結果の取得などを行うためのメソッドを用意
Callable
- 結果を返し、例外をスローすることがあるタスク
- Runnable と似ていて、どちらもインスタンスが別のスレッドによって実行される可能性があるクラス用に設計
- Runnable は結果を返さず、チェック例外をスローすることはできません
ソース
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * */ public class JGrep { private static final int THREAD_SIZE = 10; // スレッドプール private ExecutorService threadPool; public static void main(String[] args) throws Exception { if (args.length <=2) { System.out.println("Usage: java JGrep 対象ディレクトリ 正規表現 [ファイルフィルタ]"); System.exit(-1); } String fileFilter = ""; if (args.length > 2) { fileFilter = args[2]; } (new JGrep()).grep(args[0], args[1], fileFilter); } /** * @param dirName * @param expr * @param filter * @throws Exception */ public void grep(String dirName, String expr, String filter) throws Exception { // 対象ファイルを取得 File dir = new File(dirName); final String fltr = filter; File[] files = dir.listFiles( new FilenameFilter(){ public boolean accept(File dir, String name) { return name.matches(fltr); } }); // 出力先ディレクトリ File outDir = new File(dir.getAbsolutePath() + File.separator + "out"); if (!outDir.exists()) { if (!outDir.mkdirs()) { throw new IllegalStateException("出力ディレクトリが作成できません"); } } // スレッドで処理させる単位(バケツ)のサイズを決定 int buketSize = files.length / THREAD_SIZE; buketSize++; int filesInBuketCnt = 0; List<List<File>> buketBuket = new ArrayList<List<File>>(); List<File> fileBuket = null; for (File file : files) { if (filesInBuketCnt == 0) { fileBuket = new ArrayList<File>(); } fileBuket.add(file); filesInBuketCnt++; if (filesInBuketCnt > buketSize) { buketBuket.add(fileBuket); filesInBuketCnt = 0; fileBuket = null; } } if (files !=null && filesInBuketCnt > 0) { buketBuket.add(fileBuket); } // Callable は、Runnabla と同じような役割を担うが、 // Runnable と異なり、結果を返し、例外をスローすることができる List<Callable<Long>> tasks = new ArrayList<Callable<Long>>(); threadPool = Executors.newFixedThreadPool(THREAD_SIZE); int threadId = 1; for (List<File> lstFiles : buketBuket) { tasks.add( new GrepCommand(threadId++, (File[])lstFiles.toArray(new File[0]), outDir, expr)); } // 指定されたタスクを実行し、すべて完了すると、ステータスと結果を含む Future のリストを返す List<Future<Long>> results = threadPool.invokeAll(tasks); for (Future<Long> f : results) { System.out.printf("一致件数 %d 件\n", f.get().longValue()); } System.out.println("終了しました。"); } /** * grep を実行する */ public static class GrepCommand implements Callable<Long> { private int threadId; private File[] files; private File outDir; private String expr; /** * 対象ファイルから、正規表現に一致する行を、出力ディレクトリにスレッド識別用IDファイル名にて結果出力する * @param threadId スレッド識別用ID * @param files grep 対象ファイル * @param outDir 結果出力ディレクトリ * @param expr 正規表現(Java) */ public GrepCommand(int threadId, File[] files, File outDir, String expr) { if (outDir == null || !outDir.isDirectory()) { throw new IllegalArgumentException("不正な出力先"); } this.threadId = threadId; this.files = files; this.outDir = outDir; this.expr = expr; } /* (non-Javadoc) * @see java.util.concurrent.Callable#call() */ public Long call() throws Exception { File outFile = new File(this.outDir.getAbsolutePath() + File.separator + String.format("%03d.txt", this.threadId)); BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(new FileOutputStream(outFile))); long match = 0L; for (File file : files) { System.out.printf("[%03d]処理中 ... %s\n", this.threadId, file.getName()); BufferedReader reader = new BufferedReader( new InputStreamReader(new FileInputStream(file))); long lineno = 0; boolean isFirstMatch = false; String line = null; while((line = reader.readLine()) != null) { lineno++; if (line.matches(this.expr)) { if (!isFirstMatch) { writer.write("\n--------------------------------\n"); writer.write(file.getAbsolutePath()); writer.write("\n--------------------------------\n"); isFirstMatch = true; } writer.write(line + "\n"); match++; } } reader.close(); } writer.close(); // 結果を返す return new Long(match); } } }
© 2006 矢木浩人