AWS API Gateway(2)Kinesisへのデータ投入

API Gatewayから大量のデータを投入し、これらのデータを解析および蓄積する場合には、API GatewayからLambdaに直接データを渡すのではなく、大規模データストリームサービスのKinesisを介した方が、バックエンドのシステムが高負荷に晒されずに安定的に稼働させられる場合がある。

API Gatewayとバックエンドのエンドポイントとの接続は、API Gateway内の統合リクエスト機能を用いて行い、統合リクエストは下の5つの統合タイプを用意している。

  • Lambda関数
  • HTTP
  • Mock
  • AWSサービス
  • VPCリンク

Mockは、バックエンドと接続せずにレスポンスを返す統合タイプで、テストを行う際や、CROSのプリフライトリクエストの返答などに用いる。

統合リクエスト

Kinesisと接続する場合は、POSTメソッドを作成し、上記のうちの「AWSサービス」を選択して以下のように各欄に必要事項を入力する。Kinesisへデータ投入する際には、Kinesis側で用意されているputRecordメソッドを使用する。

データ入力項目

項目 入力内容 備考
統合タイプ AWSサービス
AWS リージョン 当該Kinesisのリージョン
AWS サービス Kinesis
AWS サブドメイン 空欄
HTTP メソッド POST
アクション PutRecord
実行ロール KinesisへのputRecordをAPI Gatewayに許可するIAMロール arn:aws:iam::account-id:role/iam-role-name

IAMロールについては、KinesisへのputRecordをAPI Gatewayに許可する記述が必要で、IAM上で以下の内容を含んだIAMロールを作成する。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": "arn:aws:kinesis:*:*:stream/*"
        }
    ]
}

データマッピング

API GatewayからKinesisにデータを投入する場合は、クライアントから受信したデータをKinesisが規定するデータフォマットに変換する必要がある。これに対応するのがデータマッピング機能で、Velocity Template Languageを用いて記述することができる。

よく使われる関数および使用例を以下に挙げる。

記述 内容
#set($param = ”) 変数の定義
#if(評価式) #end if文
$context.requestTimeEpoch データの受信時刻
$input.path(‘$.param’) 入力データ内の指定タグの値
$input.path(‘$.param’).size 入力データ内の指定タグの数
$input.params().header.get(”) ヘッダ内の指定タグの値
$input.json(‘$.param’) 入力データ内の指定タグJSONデータ
$util.urlDecode($input.path(‘$’)) 入力データをURLデコード
$util.escapeJavaScript(data) 文字をエスケープ
$util.base64Encode(data) 文字をBASE64エンコード

また、これらを使用してデータマッピングを記述すると以下となる。

#set($allParams = $input.params())
{
  "params" : {
    #foreach($type in $allParams.keySet())
    #set($params = $allParams.get($type))
    "$type" : {
      #foreach($paramName in $params.keySet())
      "$paramName" : "$util.escapeJavaScript($params.get($paramName))"
      #if($foreach.hasNext),#end
      #end
    }
    #if($foreach.hasNext),#end
    #end
  }
}

AWS SNS(1)Simple Notification Serviceの概要

Amazon SNSについて

Amazon SNSは、マネージド型のメッセージ発行/購読サービス。SNSは、発行者からのメッセージを受信し、LambdaやSQS、Email、SMSなどの購読者に対してこれらを送信することが可能である。SNSを使用するときは、その所有者としてトピックを作成し、発行者と購読者を定義するポリシーを策定する。

SNSの流れ

トピック名は他と識別できるユニークな名称であり、発行者はこのARNを利用してメッセージをこのトピックに送信する。購読者は、送信されたメッセージを購読するかどうか判断し、購読を許可した場合に全てのメッセージが受信可能となる。発行者は、各配信先ごとに異なるメッセージを送信することも可能である。トピックの所有者は、購読情報を全て削除(クリーンアップ)することも可能となっている。

SNSの機能とシナリオ

SNSは以下のようなシナリオで動作させることができる。

ファンアウト

発行者からのメッセージをSNSが複製することで、並列非同期処理が可能となるシナリオ。並行処理を行なったり、本番環境のデータをテスト環境に入力するために使用できる。

アラートの送信

アプリケーションやシステムからの出力に対してあらかじめ閾値を定めておき、それを超えた場合にメッセージが送信される。

EメールおよびSMSの送信

特定の購読者へEメールおよびSMSの送信する。

モバイルプッシュ通知

モバイルアプリケーションへ通知を送信する。

ポリシー

所有者は各トピックに対して、「どの購読者(=プリンシパル)がどの配信先(=リソース)を受信できるか。」「どの発行者(=プリンシパル)がメッセージの発行を行えるか」などのアクションを定めることができる。ポリシーのデフォルトは拒否であるため、許可を与えるためにはポリシーを明示しなくてはならない。また、ポリシーでは、リトライ回数や遅延時間等も指定することができる。

AWS S3(3)S3の概要

S3 (Simple Storage Service)とは

S3は、どこからの、どのような量のデータ(通常100バケットまで1ファイル5TBまで)でも保存と取得が可能なオブジェクトストレージ。データは3箇所以上のデータセンタへ自動複製され、
99.999999999% の耐久性を提供している。高い耐久性、可用性、スケーラビリティー、数多くのセキュリテイ機能を持つ。AWS AthenaやS3 Selectを用いることで簡単に、S3内のデータに対してビッグデータ解析を行うことが可能で、さまざまな方法でS3へのデータ転送を行うことができる。

S3には、S3 StanderdS3 Standerd(低頻度アクセス)S3(1ゾーン/低頻度アクセス)Amazon Glacierの4つのストレージが用意されている。S3(1ゾーン/低頻度アクセス)は、地震や洪水といった災害によるアベイラビリティーゾーンの物理的な損失時にデータを失う可能性がある。S3 Standerd(低頻度アクセス)とS3(1ゾーン/低頻度アクセス)は、他の手法で復元可能なデータや原本のコピーを保存する目的で使用する。VPCエンドポイントを用いることで、同一リージョンのVPC内からセキュアにファイル転送を行うことが可能である。また、複数の暗号化、監査ログ、バージョニングにも対応している。

S3は、キーバリュー型のストアであるので、フラットな構造であり、ディレクトリや階層構造は存在しない。フォルダやファイル名に相当するのがキーであり、スラッシュ文字によってディレクトリ構造のように見せることができる。

タイプ 堅牢性 備考
Standard 99.999999999% 3箇所以上にデータ複製
Standard(低頻度アクセス) 99.999999999% 安価だが読み出しに課金される
1ゾーン(低頻度アクセス) 99.99% 低い堅牢性。オブジェクト毎に指定可能。
Glacier 99.999999999% 取り出しに時間(3-5時間)とコストを要する

S3は、ファイルを複数のチャンクに分割して並列アップロードを行う、Multipart Uploadに対応している。ファイルサイズが100MBを超える場合は、このMultipart Uploadを使用することが奨励されている。AWS CLIでは、ファイルサイズによって自動判別されてこの機能が利用される。Glacierに格納されたデータの復元時には、迅速(Expedited)(=1-5分)、標準(Standard)(=3-5時間)、大容量(Bulk)(=5-12時間)の3種類が用意され、それぞれ実行単価が異なる。

また、静的なファイルをS3のみでホステイング可能なWEBサイトホスティング機能を有している。独自ドメインの指定クロスドメインCloudFrontとの連携なども可能。

セキュリティ

アクセス管理

S3はデフォルトでは全てプライベートアクセス権限となっている。アクセス権限は、バケットやオブジェクト単位で指定可能である。IAMユーザ単位でS3へのアクセス権限を指定できる「ユーザポリシー」(=IPアドレスも指定可能)、バケット毎にアクセス権限を指定できる「バケットポリシー」(=IPアドレスレンジやMFA等も指定可能)、バケットやオブジェクト単位で指定可能な「ACL」などが存在する。バケットポリシーは、バケットの所有者のみが設定でき、またACLは、バケットACLよりもオブジェクトACLが優先される。

暗号化

サーバサイド暗号化、クライアントサイド暗号化の両方に対応している。デフォルト暗号化を指定することも可能である。

Pre-signed Object URL

一定時間のみアクセスを許可するURLを発行できる。

通知

バケットにイベントが発生した際に、SNS、SQS、Lambdaに対して通知を行うことが可能。

モニタリング

CloudWatchとCloudTrailによるモニタリングが可能。

料金

通常ははストレージおよびデータ転送に掛かるコスト全ては、バケットの所有者が負担する。しかし、リクエスタ支払いバケットに指定した場合は、リクエストおよびバケットからのデータダウンロードに掛かるコストは、 所有者ではなくリクエストを実行したリクエスタが支払う

バージョニング

バージョニングが有効となったオブジェクトに対してDELETE処理を行った場合、全てのバージョンはストレージに残り削除マーカーが付加される。当該オブジェクトをGETしようとすると404 Not Foundが返されるが、オブジェクトバージョンを指定すると当該オブジェクトを取得可能である。

ライフサイクル

ライフサイクルと呼ばれる、オブジェクトに対するアクションルールをXMLにより規定できる。ライフサイクルによって、オブジェクトを異なるストレージクラスに移行したり、オブジェクトを削除したりすることができる。Glacierは削除や上書き、アーカイブリクエスト、復元に対して費用が発生する。ただし90日以上アーカイブされているオブジェクトに対する削除および上書きは無料である。

AWS Lambda(2)LambdaでKinesis Recordを取得する

Kinesis Process Records

Kinesis Event 発生毎にLambdaを実行し、Kinesis Recordを取得することが可能である。AWS マネジメントコンソールから関数を作成する場合は、kinesis-process-recordを選択。

Kinesis関数の設定

Stream名など必要事項を記入しプログラムをアップロードすることでLambdaを実行することが可能となる。

lambda_kinesis_function_2

Eclipseで開発する場合は、EclipseのAWSプラグインから直接アップロードや設定を行うことも可能である。Eclipseプロジェクトを右クリックして、「Amazon Web Services」から「Upload function to AWS Lambda…」を選択、「アップロードするリージョン」と「関数名」を指定し、

Kinesis関数の設定

「Description」や「Role」、プログラム一式を格納する「S3バケット」を指定して「Finish」を押すとEclipseで作成したLambda関数がAWS上にアップロードされる。

Eclipse設定

JavaによるLambda関数の実装

RequestHandler<KinesisEvent, Object>を実装することで、Kinesisからデータを取得し処理を行うことが可能となる。1回の処理で複数のKinesis Recordを取得するのでfor文で廻してそれぞれのRecordの値を取得することができる。Kinesis Recordには、ApproximateArrivalTimestampというRecord入力時に自動付与されるタイムスタンプが存在し、この値を取得するためのgetApproximateArrivalTimestamp()というメソッドも用意されているが、現時点では値の取得に対応していおらず、このメソッドを実行してもnullしか返えらないので注意が必要である。

public class KinesisEventHandler implements RequestHandler<KinesisEvent, Object> {

    @Override
    public Object handleRequest(KinesisEvent input, Context context) {

        context.getLogger().log("Input: " + input);

        for(KinesisEventRecord rec : input.getRecords()) {
        	// Recordを取得
        	Record record = rec.getKinesis();
        	// JSONを取得
        	byte[] byteArray = new byte[record.getData().remaining()];
        	record.getData().get(byteArray);
        	String json = new String(byteArray);
        	// タイムスタンプが取得できない
        	String timestamp = rec.getKinesis().getApproximateArrivalTimestamp();
        }
    }
}

Lambdaのスロットリング

Lambdaは、1つの関数につき1秒あたり同時処理数100でスロットリングされている。また、上限値の引き上げを申請しても1秒あたり同時接続数1000程度が限界のようである。したがって、Kinesisに大量のRecordが入力された場合はLambdaの上限を超えてしまいRecordを取りこぼしてしまう可能性がある。Kinesis Recordを確実に取得し処理する必要がある場合は、EC2上でKinesis Client Libraryを用いてKinesis Appを実装する必要がある。

AWS Lambda(1)Lambdaの概要

Lambdaとは

AWS Lambdaは、イベントをトリガにクラウド上で独自のコードを実行させるサービス。EC2インスタンス等の管理が不要で負荷に合わせて自動でスケールアウトすることのできるフルマネージドのサービスである。100ミリ秒単位の処理時間で従量課金される。例えば、画像がアップロードされたタイミングでサムネイル画像を生成したり、DynamoDBへの書き込みタイミングプッシュ通知を行うなどをLambdaを用いて実現できる。イベントの発生元となるAWSリソースは、S3, Kinesis, DynamoDB, Cognitoなどで、Pull/Pushの2種類のイベントモデルが存在する。

コードはブラウザ上で直接編集するか、Zip形式のファイルをアップロードすることで編集可能である。各種ライブラリはZipファイルに含めることで実行が可能となる。

データの永続化やS3DynamoDBを使用する。/tmpも使用できる。

LambdaのIAM権限は、誰がファンクションを実行できるかを記述したInvocation Roleと、どのようなことを実行できるかを記述したExecution Roleの2種類が存在する。

コンテクスト

実行コンテナは、タイムアウト(Timeout)した場合や、”context.done()“が呼び出された場合(Controlled termination)、全ての処理を終了した場合(Default termination)、クラッシュや”process.exit()“した場合に処理を終了する。実行コンテナは、前回の処理からある程度の時間が経過していた場合は新規で作成されるが、前回使用したものを再利用することもある。context.done()を記述することで予想外の実行を防ぐことができる。

内容
context.succeed(Object result) ファンクションおよびコールバックが正常終了
context.fail(Object result) ファンクションおよびコールバックがエラー
context.done(String message, Object result) ファンクションが終了. messageに値が入力されているとエラー
awsRequestId ファンクション呼び出しID
logStreamName CloudWatch LogsのLogストリーム名
clientContext クライアントアプリおよびデバイスの情報
Identity CognitoのIdentity Provider情報