AWS Lambda(2)LambdaでKinesis Recordを取得する

Kinesis Process Records

Kinesis Event 発生毎にLambdaを実行し、Kinesis Recordを取得することが可能である。AWS マネジメントコンソールから関数を作成する場合は、kinesis-process-recordを選択。

Kinesis関数の設定

Stream名など必要事項を記入しプログラムをアップロードすることでLambdaを実行することが可能となる。

lambda_kinesis_function_2

Eclipseで開発する場合は、EclipseのAWSプラグインから直接アップロードや設定を行うことも可能である。Eclipseプロジェクトを右クリックして、「Amazon Web Services」から「Upload function to AWS Lambda…」を選択、「アップロードするリージョン」と「関数名」を指定し、

Kinesis関数の設定

「Description」や「Role」、プログラム一式を格納する「S3バケット」を指定して「Finish」を押すとEclipseで作成したLambda関数がAWS上にアップロードされる。

Eclipse設定

JavaによるLambda関数の実装

RequestHandler<KinesisEvent, Object>を実装することで、Kinesisからデータを取得し処理を行うことが可能となる。1回の処理で複数のKinesis Recordを取得するのでfor文で廻してそれぞれのRecordの値を取得することができる。Kinesis Recordには、ApproximateArrivalTimestampというRecord入力時に自動付与されるタイムスタンプが存在し、この値を取得するためのgetApproximateArrivalTimestamp()というメソッドも用意されているが、現時点では値の取得に対応していおらず、このメソッドを実行してもnullしか返えらないので注意が必要である。

public class KinesisEventHandler implements RequestHandler<KinesisEvent, Object> {

    @Override
    public Object handleRequest(KinesisEvent input, Context context) {

        context.getLogger().log("Input: " + input);

        for(KinesisEventRecord rec : input.getRecords()) {
        	// Recordを取得
        	Record record = rec.getKinesis();
        	// JSONを取得
        	byte[] byteArray = new byte[record.getData().remaining()];
        	record.getData().get(byteArray);
        	String json = new String(byteArray);
        	// タイムスタンプが取得できない
        	String timestamp = rec.getKinesis().getApproximateArrivalTimestamp();
        }
    }
}

Lambdaのスロットリング

Lambdaは、1つの関数につき1秒あたり同時処理数100でスロットリングされている。また、上限値の引き上げを申請しても1秒あたり同時接続数1000程度が限界のようである。したがって、Kinesisに大量のRecordが入力された場合はLambdaの上限を超えてしまいRecordを取りこぼしてしまう可能性がある。Kinesis Recordを確実に取得し処理する必要がある場合は、EC2上でKinesis Client Libraryを用いてKinesis Appを実装する必要がある。