AWS Kinesis(5)Kinesis Client Libraryでマルチスレッド処理を行う

マルチスレッド処理

AWSが公開しているKinesis Client Library(KCL)のサンプルプログラムでは、取得した各レコードをシングルスレッドで順次処理している。しかしこれでは、前のRecord処理が終了しないと次のRecord処理が実行できないため、Kinesis Client LibraryのDEFAULT_MAX_RECORDの値を上げたとしても性能が十分出ない。

そこで、Record処理を単純にRunnaleなどでマルチスレッド化してしまうとスレッド数が制御できなくなり、例えばKinesis Recordから取得したデータをDynamoDBに順次書き込むという制御を記述していた場合は、

com.amazonaws.http.AmazonHttpClient executeHelper
INFO: Unable to execute HTTP request: Timeout waiting for connection from pool
org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool

など、DynamoDBにアクセスするためのHTTPのリソースが枯渇してエラーが発生し、Record処理に漏れが生じてしまう。そこで、ExecutorServiceを用いてスレッド数を制御しながらマルチスレッド化する。

    /**
     * Process records performing retries as needed. Skip "poison pill" records.
     *
     * @param records Data records to be processed.
     */
    private void processRecordsWithRetries(List<Record> records) {
        ExecutorService exec = Executors.newFixedThreadPool(NUM);
        try {	        
            for (Record record : records) {
                boolean processedSuccessfully = false;
                for (int i = 0; i < NUM_RETRIES; i++) {
                    try {
                        // スレッドタスクを実行
                        exec.submit(new My_Method(this, record));
                        // 略...
                    }
                }
            }
        } finally {
            // スレッドタスクを終了
            exec.shutdown();
            if(!exec.awaitTermination(60, TimeUnit.SECONDS)){
                exec.shutdownNow();
            }
        }
    }

ExecutorServiceは、スレッドプールを用いて、マルチスレッドタスクを管理しながら実行できる仕組みで、submit()によりタスクが生成されて、ブロッキングキューに挿入され、shutdown()もしくはshutdownNow()メソッドにより処理を終了させることができる。ExecutorServiceを用いると生成するスレッド数を指定できることから、無尽蔵にスレッドが生成される心配がない。

ExecutorServiceは、終了処理を必ず明示的に実装しておく必要があるサービスである。生成済みのタスクは、shutdown()メソッド実行後も処理が継続されるが、shutdownNow()メソッドの場合は、強制的に処理がキャンセルされる。すなわち、shutdown()メソッド実行を実行したあとも、処理が継続したままである場合があることに注意が必要である。したがって、上記のように、shutdown()メソッドを実行した後に、awaitTermination()メソッドによりタイムアウトの時間を設定しておき、この時間を超えても処理が継続している場合には、shutdownNow()メソッドで強制的に処理を終了させるという実装にすることが望ましい。

ExecutorServiceで設定したスレッド数(NUM)が多すぎると以下のエラーが発生するため、スレッド数の上限を設定する際は注意が必要である。

java.lang.OutOfMemoryError: unable to create new native thread 

プログラムがどれほどスレッドを消費しているかは、以下のコマンドで確認が可能である。

ls -l /proc/[プログラムのプロセスNo.]/task | wc -l

また、Linuxの最大スレッド数は、

cat /proc/sys/kernel/threads-max

ユーザ1人あたりの制限は、

ulimit -a

で確認することが可能である。

AWS Lambda(2)LambdaでKinesis Recordを取得する

Kinesis Process Records

Kinesis Event 発生毎にLambdaを実行し、Kinesis Recordを取得することが可能である。AWS マネジメントコンソールから関数を作成する場合は、kinesis-process-recordを選択。

Kinesis関数の設定

Stream名など必要事項を記入しプログラムをアップロードすることでLambdaを実行することが可能となる。

lambda_kinesis_function_2

Eclipseで開発する場合は、EclipseのAWSプラグインから直接アップロードや設定を行うことも可能である。Eclipseプロジェクトを右クリックして、「Amazon Web Services」から「Upload function to AWS Lambda…」を選択、「アップロードするリージョン」と「関数名」を指定し、

Kinesis関数の設定

「Description」や「Role」、プログラム一式を格納する「S3バケット」を指定して「Finish」を押すとEclipseで作成したLambda関数がAWS上にアップロードされる。

Eclipse設定

JavaによるLambda関数の実装

RequestHandler<KinesisEvent, Object>を実装することで、Kinesisからデータを取得し処理を行うことが可能となる。1回の処理で複数のKinesis Recordを取得するのでfor文で廻してそれぞれのRecordの値を取得することができる。Kinesis Recordには、ApproximateArrivalTimestampというRecord入力時に自動付与されるタイムスタンプが存在し、この値を取得するためのgetApproximateArrivalTimestamp()というメソッドも用意されているが、現時点では値の取得に対応していおらず、このメソッドを実行してもnullしか返えらないので注意が必要である。

public class KinesisEventHandler implements RequestHandler<KinesisEvent, Object> {

    @Override
    public Object handleRequest(KinesisEvent input, Context context) {
    	    	
        context.getLogger().log("Input: " + input);
        
        for(KinesisEventRecord rec : input.getRecords()) {
        	// Recordを取得
        	Record record = rec.getKinesis();
        	// JSONを取得
        	byte[] byteArray = new byte[record.getData().remaining()];
        	record.getData().get(byteArray);
        	String json = new String(byteArray);
        	// タイムスタンプが取得できない
        	String timestamp = rec.getKinesis().getApproximateArrivalTimestamp();
        }
    }
}

Lambdaのスロットリング

Lambdaは、1つの関数につき1秒あたり同時処理数100でスロットリングされている。また、上限値の引き上げを申請しても1秒あたり同時接続数1000程度が限界のようである。したがって、Kinesisに大量のRecordが入力された場合はLambdaの上限を超えてしまいRecordを取りこぼしてしまう可能性がある。Kinesis Recordを確実に取得し処理する必要がある場合は、EC2上でKinesis Client Libraryを用いてKinesis Appを実装する必要がある。