AWS Lambda(2)LambdaでKinesis Recordを取得する

Kinesis Process Records

Kinesis Event 発生毎にLambdaを実行し、Kinesis Recordを取得することが可能である。AWS マネジメントコンソールから関数を作成する場合は、kinesis-process-recordを選択。

Kinesis関数の設定

Stream名など必要事項を記入しプログラムをアップロードすることでLambdaを実行することが可能となる。

lambda_kinesis_function_2

Eclipseで開発する場合は、EclipseのAWSプラグインから直接アップロードや設定を行うことも可能である。Eclipseプロジェクトを右クリックして、「Amazon Web Services」から「Upload function to AWS Lambda…」を選択、「アップロードするリージョン」と「関数名」を指定し、

Kinesis関数の設定

「Description」や「Role」、プログラム一式を格納する「S3バケット」を指定して「Finish」を押すとEclipseで作成したLambda関数がAWS上にアップロードされる。

Eclipse設定

JavaによるLambda関数の実装

RequestHandler<KinesisEvent, Object>を実装することで、Kinesisからデータを取得し処理を行うことが可能となる。1回の処理で複数のKinesis Recordを取得するのでfor文で廻してそれぞれのRecordの値を取得することができる。Kinesis Recordには、ApproximateArrivalTimestampというRecord入力時に自動付与されるタイムスタンプが存在し、この値を取得するためのgetApproximateArrivalTimestamp()というメソッドも用意されているが、現時点では値の取得に対応していおらず、このメソッドを実行してもnullしか返えらないので注意が必要である。

public class KinesisEventHandler implements RequestHandler<KinesisEvent, Object> {

    @Override
    public Object handleRequest(KinesisEvent input, Context context) {
    	    	
        context.getLogger().log("Input: " + input);
        
        for(KinesisEventRecord rec : input.getRecords()) {
        	// Recordを取得
        	Record record = rec.getKinesis();
        	// JSONを取得
        	byte[] byteArray = new byte[record.getData().remaining()];
        	record.getData().get(byteArray);
        	String json = new String(byteArray);
        	// タイムスタンプが取得できない
        	String timestamp = rec.getKinesis().getApproximateArrivalTimestamp();
        }
    }
}

Lambdaのスロットリング

Lambdaは、1つの関数につき1秒あたり同時処理数100でスロットリングされている。また、上限値の引き上げを申請しても1秒あたり同時接続数1000程度が限界のようである。したがって、Kinesisに大量のRecordが入力された場合はLambdaの上限を超えてしまいRecordを取りこぼしてしまう可能性がある。Kinesis Recordを確実に取得し処理する必要がある場合は、EC2上でKinesis Client Libraryを用いてKinesis Appを実装する必要がある。

AWS Lambda(1)Lambdaとは何なのか

Lambdaとは

AWS Lambdaは、イベントをトリガにクラウド上で独自のコードを実行させるサービス。EC2インスタンス等の管理が不要で負荷に合わせて自動でスケールアウトすることのできるフルマネージドのサービスである。100ミリ秒単位の処理時間で従量課金される。例えば、画像がアップロードされたタイミングでサムネイル画像を生成したり、DynamoDBへの書き込みタイミングプッシュ通知を行うなどをLambdaを用いて実現できる。イベントの発生元となるAWSリソースは、S3, Kinesis, DynamoDB, Cognitoなどで、Pull/Pushの2種類のイベントモデルが存在する。

コードはブラウザ上で直接編集するか、Zip形式のファイルをアップロードすることで編集可能である。各種ライブラリはZipファイルに含めることで実行が可能となる。

データの永続化やS3DynamoDBを使用する。/tmpも使用できる。

LambdaのIAM権限は、誰がファンクションを実行できるかを記述したInvocation Roleと、どのようなことを実行できるかを記述したExecution Roleの2種類が存在する。

コンテクスト

実行コンテナは、タイムアウト(Timeout)した場合や、”context.done()“が呼び出された場合(Controlled termination)、全ての処理を終了した場合(Default termination)、クラッシュや”process.exit()“した場合に処理を終了する。実行コンテナは、前回の処理からある程度の時間が経過していた場合は新規で作成されるが、前回使用したものを再利用することもある。context.done()を記述することで予想外の実行を防ぐことができる。

内容
context.succeed(Object result) ファンクションおよびコールバックが正常終了
context.fail(Object result) ファンクションおよびコールバックがエラー
context.done(String message, Object result) ファンクションが終了. messageに値が入力されているとエラー
awsRequestId ファンクション呼び出しID
logStreamName CloudWatch LogsのLogストリーム名
clientContext クライアントアプリおよびデバイスの情報
Identity CognitoのIdentity Provider情報