java

 

javaのExecutorServiceで簡単に並列処理を実現!

javaにはExecutorServiceという簡単に並列処理ができるクラスがあります。全ての並列処理の完了を待ってから次の処理を行う等も簡単に行えます。

新サイト、tree-mapsを公開しました!!

tree-maps: 地図のWEB TOOLの事ならtree-mapsにお任せ!

地図に関するWEB TOOL専門サイトです!!

大画面で大量の緯度経度を一気にプロット、ジオコーディング、DMS<->DEGの相互変換等ができます!

◯ 広告

古いjdkで並列処理をするには、RunnableThreadを使った原始的なものでした。

jdk1.5になってから並列処理のクラスは一新され、ExecutorServiceが登場しました。

このExecutorServiceはRunnableやThreadより遥かに楽に実装できる優れものです。

楽に実装できるという事は、バグを減らせて実装の効率も上がるという事なので是非使いましょう!

複数のタスククラスを並列実行し、各タスクの返り値の合計値を取得するサンプルです。

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

public class ParallelTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelTest.class);

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(15);
        try {
            // タスクをリストに登録
            List<Callable<Integer>> tasks = Lists.newArrayList();
            tasks.add(new ParallelTask("タスク1"));
            tasks.add(new ParallelTask("タスク2"));
            tasks.add(new ParallelTask("タスク3"));

            // タスクを並列実行する
            List<Future<Integer>> futures = null;
            try {
                futures = es.invokeAll(tasks);
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), e);
                return;
            }

            // 並列処理の返り値を取得する
            Integer count = 0;
            for (Future<Integer> future : futures) {
                try {
                    count += future.get();
                } catch (Exception e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            LOGGER.info("カウントの合計は{}です。", count);
        } finally {
            if (es != null)
                es.shutdown();
        }
    }

    public static class ParallelTask implements Callable<Integer> {

        private String taskName;

        private Integer count = 0;

        public ParallelTask(String taskName) {
            this.taskName = taskName;
        }

        @Override
        public Integer call() throws Exception {
            LOGGER.info("{}の処理を開始します。カウントは{}です。", taskName, count);
            for (int i = 0; i < 5; i++) {
                count++;
                LOGGER.info("{}, count={}, i={}", taskName, count, i);
            }
            LOGGER.info("{}の処理を終了します。カウントは{}です。", taskName, count);
            return count;
        }

    }
}

Futureクラスを使って、全タスクの終了を待っています。

メイン処理を並列実行し、Futureで全タスクの完了を待ってから処理結果を更に並列処理したりできます。

INFO 2013/10/05 16:23:04:386 [pool-1-thread-3] (ParallelTest.call:62) - タスク3の処理を開始します。カウントは0です。
INFO 2013/10/05 16:23:04:386 [pool-1-thread-1] (ParallelTest.call:62) - タスク1の処理を開始します。カウントは0です。
INFO 2013/10/05 16:23:04:386 [pool-1-thread-2] (ParallelTest.call:62) - タスク2の処理を開始します。カウントは0です。
INFO 2013/10/05 16:23:04:395 [pool-1-thread-3] (ParallelTest.call:65) - タスク3, count=1, i=0
INFO 2013/10/05 16:23:04:395 [pool-1-thread-2] (ParallelTest.call:65) - タスク2, count=1, i=0
INFO 2013/10/05 16:23:04:395 [pool-1-thread-1] (ParallelTest.call:65) - タスク1, count=1, i=0
INFO 2013/10/05 16:23:04:395 [pool-1-thread-3] (ParallelTest.call:65) - タスク3, count=2, i=1
INFO 2013/10/05 16:23:04:395 [pool-1-thread-2] (ParallelTest.call:65) - タスク2, count=2, i=1
INFO 2013/10/05 16:23:04:395 [pool-1-thread-1] (ParallelTest.call:65) - タスク1, count=2, i=1
INFO 2013/10/05 16:23:04:396 [pool-1-thread-2] (ParallelTest.call:65) - タスク2, count=3, i=2
INFO 2013/10/05 16:23:04:396 [pool-1-thread-3] (ParallelTest.call:65) - タスク3, count=3, i=2
INFO 2013/10/05 16:23:04:396 [pool-1-thread-1] (ParallelTest.call:65) - タスク1, count=3, i=2
INFO 2013/10/05 16:23:04:396 [pool-1-thread-2] (ParallelTest.call:65) - タスク2, count=4, i=3
INFO 2013/10/05 16:23:04:396 [pool-1-thread-3] (ParallelTest.call:65) - タスク3, count=4, i=3
INFO 2013/10/05 16:23:04:396 [pool-1-thread-1] (ParallelTest.call:65) - タスク1, count=4, i=3
INFO 2013/10/05 16:23:04:397 [pool-1-thread-3] (ParallelTest.call:65) - タスク3, count=5, i=4
INFO 2013/10/05 16:23:04:397 [pool-1-thread-1] (ParallelTest.call:65) - タスク1, count=5, i=4
INFO 2013/10/05 16:23:04:396 [pool-1-thread-2] (ParallelTest.call:65) - タスク2, count=5, i=4
INFO 2013/10/05 16:23:04:397 [pool-1-thread-3] (ParallelTest.call:67) - タスク3の処理を終了します。カウントは5です。
INFO 2013/10/05 16:23:04:397 [pool-1-thread-2] (ParallelTest.call:67) - タスク2の処理を終了します。カウントは5です。
INFO 2013/10/05 16:23:04:397 [pool-1-thread-1] (ParallelTest.call:67) - タスク1の処理を終了します。カウントは5です。
INFO 2013/10/05 16:23:04:398 [main] (ParallelTest.main:43) - カウントの合計は15です。

タスククラスに仕込んだロガーがバラバラに表示されていますね。

インクリメントしている部分も他のスレッドに邪魔されずにカウントできています。

このタスククラスですが、Callableを継承しているので、返り値が好きなように返せます。

今回はカウント値を返していますが、処理したコード値のリスト等を返す事もできます。

ExecutorServiceの実装は複数あるので、用途によって使い分けましょう。

newSingleThreadExecutor

単一のスレッドで処理するクラスです。

単一スレッドで非同期処理を手軽に行いたい場合に使えそうです。

newFixedThreadPool

スレッドプールを生成します。

プール内に引数で指定した数のスレッドを事前に生成し、スレッドは使わ回されます。

スレッドの生成処理が初回のみなので、短時間で処理が実行されるような場合に使うといいかと思います。

スレッドが破棄されないのでリソースは消費されますが、スレッド生成コストは初回のみになります。

newCachedThreadPool

スレッドプールを生成します。

プール内に事前にスレッドを生成する事はせず、タスク実行時のスレッドをキャッシュしていきます。

スレッドには寿命が設定され、寿命が切れるとプール内の該当スレッドは破棄されます。

スレッド生成コストが発生しますが、スレッドは破棄されるのでアイドル時はリソース消費が少ないです。

タスクとタスクの実行に間が開く場合やタスクの実行数が不定の場合、リソース消費を抑える事ができます。

newSingleThreadScheduledExecutor

一定間隔に事項したり遅延実行する等、タスクの実行タイミングを細かく設定する事ができます。

単一スレッドで実行されます。

newScheduledThreadPool

スレッドプールを生成します。

一定間隔に事項したり遅延実行する等、タスクの実行タイミングを細かく設定する事ができます。

引数でプールサイズを決め、スレッドは使いまわされます。

◯ 広告