マルチスレッド処理
AWSが公開しているKinesis Client Library(KCL)のサンプルプログラムでは、取得した各レコードをシングルスレッドで順次処理している。しかしこれでは、前のRecord処理が終了しないと次のRecord処理が実行できないため、Kinesis Client LibraryのDEFAULT_MAX_RECORDの値を上げたとしても性能が十分出ない。
そこで、Record処理を単純にRunnaleなどでマルチスレッド化してしまうとスレッド数が制御できなくなり、例えばKinesis Recordから取得したデータをDynamoDBに順次書き込むという制御を記述していた場合は、
com.amazonaws.http.AmazonHttpClient executeHelper INFO: Unable to execute HTTP request: Timeout waiting for connection from pool org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
など、DynamoDBにアクセスするためのHTTPのリソースが枯渇してエラーが発生し、Record処理に漏れが生じてしまう。そこで、ExecutorServiceを用いてスレッド数を制御しながらマルチスレッド化する。
/** * Process records performing retries as needed. Skip "poison pill" records. * * @param records Data records to be processed. */ private void processRecordsWithRetries(List<Record> records) { ExecutorService exec = Executors.newFixedThreadPool(NUM); try { for (Record record : records) { boolean processedSuccessfully = false; for (int i = 0; i < NUM_RETRIES; i++) { try { // スレッドタスクを実行 exec.submit(new My_Method(this, record)); // 略... } } } } finally { // スレッドタスクを終了 exec.shutdown(); if(!exec.awaitTermination(60, TimeUnit.SECONDS)){ exec.shutdownNow(); } } }
ExecutorServiceは、スレッドプールを用いて、マルチスレッドタスクを管理しながら実行できる仕組みで、submit()によりタスクが生成されて、ブロッキングキューに挿入され、shutdown()もしくはshutdownNow()メソッドにより処理を終了させることができる。ExecutorServiceを用いると生成するスレッド数を指定できることから、無尽蔵にスレッドが生成される心配がない。
ExecutorServiceは、終了処理を必ず明示的に実装しておく必要があるサービスである。生成済みのタスクは、shutdown()メソッド実行後も処理が継続されるが、shutdownNow()メソッドの場合は、強制的に処理がキャンセルされる。すなわち、shutdown()メソッド実行を実行したあとも、処理が継続したままである場合があることに注意が必要である。したがって、上記のように、shutdown()メソッドを実行した後に、awaitTermination()メソッドによりタイムアウトの時間を設定しておき、この時間を超えても処理が継続している場合には、shutdownNow()メソッドで強制的に処理を終了させるという実装にすることが望ましい。
ExecutorServiceで設定したスレッド数(NUM)が多すぎると以下のエラーが発生するため、スレッド数の上限を設定する際は注意が必要である。
java.lang.OutOfMemoryError: unable to create new native thread
プログラムがどれほどスレッドを消費しているかは、以下のコマンドで確認が可能である。
ls -l /proc/[プログラムのプロセスNo.]/task | wc -l
また、Linuxの最大スレッド数は、
cat /proc/sys/kernel/threads-max
ユーザ1人あたりの制限は、
ulimit -a
で確認することが可能である。