Kinesis Process Records
Kinesis Event 発生毎にLambdaを実行し、Kinesis Recordを取得することが可能である。AWS マネジメントコンソールから関数を作成する場合は、kinesis-process-recordを選択。
Stream名など必要事項を記入しプログラムをアップロードすることでLambdaを実行することが可能となる。
Eclipseで開発する場合は、EclipseのAWSプラグインから直接アップロードや設定を行うことも可能である。Eclipseプロジェクトを右クリックして、「Amazon Web Services」から「Upload function to AWS Lambda…」を選択、「アップロードするリージョン」と「関数名」を指定し、
「Description」や「Role」、プログラム一式を格納する「S3バケット」を指定して「Finish」を押すとEclipseで作成したLambda関数がAWS上にアップロードされる。
JavaによるLambda関数の実装
RequestHandler<KinesisEvent, Object>を実装することで、Kinesisからデータを取得し処理を行うことが可能となる。1回の処理で複数のKinesis Recordを取得するのでfor文で廻してそれぞれのRecordの値を取得することができる。Kinesis Recordには、ApproximateArrivalTimestampというRecord入力時に自動付与されるタイムスタンプが存在し、この値を取得するためのgetApproximateArrivalTimestamp()というメソッドも用意されているが、現時点では値の取得に対応していおらず、このメソッドを実行してもnullしか返えらないので注意が必要である。
public class KinesisEventHandler implements RequestHandler<KinesisEvent, Object> { @Override public Object handleRequest(KinesisEvent input, Context context) { context.getLogger().log("Input: " + input); for(KinesisEventRecord rec : input.getRecords()) { // Recordを取得 Record record = rec.getKinesis(); // JSONを取得 byte[] byteArray = new byte[record.getData().remaining()]; record.getData().get(byteArray); String json = new String(byteArray); // タイムスタンプが取得できない String timestamp = rec.getKinesis().getApproximateArrivalTimestamp(); } } }
Lambdaのスロットリング
Lambdaは、1つの関数につき1秒あたり同時処理数100でスロットリングされている。また、上限値の引き上げを申請しても1秒あたり同時接続数1000程度が限界のようである。したがって、Kinesisに大量のRecordが入力された場合はLambdaの上限を超えてしまいRecordを取りこぼしてしまう可能性がある。Kinesis Recordを確実に取得し処理する必要がある場合は、EC2上でKinesis Client Libraryを用いてKinesis Appを実装する必要がある。