AWS Kinesis(4)Kinesis Producer Library

Kinesis Producer Libraryとは

Kinesis Client Libraryに付随しているKinesis Producer Library(KPL)を用いることで、Kinesisに容易にデータを投入することが可能となる。KPLを用いることで自動的に複数シャードにデータを投入したり、ユーザレコードを集約してスループットを改善する等が可能となる。複数のレコードを単一のKinesis Recordに集約することで、API呼び出し時に取得できるデータ量が増大させることが可能になるため、より少ないシャード数の用意でデータ量を処理することが可能となる。

またKPLは独立したプロセスで動作しているため、KPLがクラッシュした場合でも他の機能の動作を継続させることが可能である。

Kinesis Producer Libraryを用いたデータ取得処理

パーティションキー

Kinesisへデータを投入する際には、ストリーム名の指定パーティションキーの指定が必要である。パーティションキーは、複数のシャードの中からストリームを追加するシャードを選択するために使用するキーで、均等にシャードを選択するためにシャード数より十分大きい数を用意する必要がある。AWS SDKに付属しているサンプルプログラムでは、現在時刻(Current Millis)をパーティションキーとして使用している。

long createTime = System.currentTimeMillis();
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", createTime));

リージョンの設定

KPL は Kinesis Client Library と同様に、リージョン指定が無いとデフォルトのバージニア州(us-east-1)が指定されてしまう。サンプルアプリケーションでは以下の設定を変更する必要がある。

kinesis = new AmazonKinesisClient(credentials);
kinesis.withRegion(Region.getRegion(Regions.fromName(REGION_NAME)));

なお、KinesisProducerConfigurationはProperyファイルからの設定の読み込みにも対応しているため、Propetyファイルからリージョンを指定することも可能である。

Kinesisへの書き込み

Kinesisへのレコードの書き込みは、Kinesis Producer Libraryを用いる方法の他に以下のような手段もある。

  • AWS SDK (Kinesis Stream API)の使用
  • Amazon Kinesis エージェントの使用
  • fluent-plugin-kinesisの使用

AWS Kinesis(3)リシャーディング

リシャーディングとは

Kinesisでは、シャードと呼ばれるストリーム単位で処理される。事前に設定するシャードの数が並列に処理を行う土管の数となる。Kinesisを設定する際にこのシャードの数を指定するが、実際の負荷に合わせてシャード数を変化させることができる。これがリシャーディングである。

Amazon Kinesis の主要なコンセプト - Amazon Kinesis

リシャーディングは単純に数の増減をするだけではなく、「どのシャードを分割するのか」や「どのシャードとどのシャードを分割するのか」を指定する必要がある。通常はどのシャードに負荷が掛かっているかをCloudWatchなどを用いて計測してリシャーディングを決定する。またシャードの結合は、隣接したシャード同士で無いと実現できない。

リシャーディング処理をコマンドライン上で実現するツールがGitHub上で公開されている。以下のようにパラメータを記述することで、リシャーディングを実行できる。

cd amazon-kinesis-scaling-utils
cd dist
java -cp KinesisScalingUtils.jar-complete.jar -Dstream-name=MyStream -Dscaling-action=scaleUp -Dcount=10 -Dregion=ap-northeast-1 ScalingClient

実行する際のオプションパラメータは以下の通り。

オプションパラメータ 内容
stream-name ストリーム名
scaling-action スケールアクション “scaleUp”, “scaleDown” or “resize”
count シャード数
pct 既存シャードに対する増加減数(%)
region リージョン
shard-id ターゲットとするシャードID

AWS Cognito(4)Cognitoストリーム

Cognitoストリームとは

Cognito Datasetに蓄積されたデータを取得するためには、Cognitoストリームを設定してKinesis経由でデータを出力する必要がある。Cognitoストリームを設定すると、Cognito Datasetに変更が生じる度にKinesisにそのデータを入力することができる。

Cognitoストリームの設定

Cognitoのダッシュボードから「Edit identity pool」をクリックし、Cognito Streamの項目を編集する

Cognitoストリームの設定

Kinesis stream名とシャード数を設定する

  • 「Create stream」をクリック
  • シャードとはKinesisの入出力処理のパイプの太さを表す

Kinesisストリームの作成

ロール設定を行う

  • 「Create role」をクリック
  • ロール名が自動入力されるので、作成ウィザードで登録を行う

ロールの設定

有効化する

  • 「StreamStatus」を「Enabled」に設定することで、Cognito Streamが有効化される

設定変更を実行する

  • また、上記の設定を行った上で「Save Changes」をクリックしてもストリーム名、シャード、ロールなどがきちんと反映されていない場合があるので、反映されているか確認することが重要である
  • ユーザのロール設定と同様に、先ほど作成したロールのポリシーを確認する
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord"
      ],
      "Resource": [
        "arn:aws:kinesis:ap-northeast-1:172664222583:stream/STREAM_NAME"
      ]
    }
  ]
}

なお、Bulk publishをクリックすることでこれまで蓄積されたCognito DatasetをKinesisに一括で送り出すことが可能だが、24時間に1回しか実行できないことに注意が必要である。

ストリームレコード

CognitoからKinesisへは、以下のフォーマットでレコードが送信される。

{
  "identityPoolId" : "Pool Id"
  "identityId" : "Identity Id "
  "dataSetName" : "Dataset Name"
  "operation" : "(replace|remove)"
  "kinesisSyncRecords" : [
    { 
      "key" : "Key",
      "value" : "Value",
      "syncCount" : 1,
      "lastModifiedDate" : 1424801824343,
      "deviceLastModifiedDate" : 1424801824343,
      "op": "(replace|remove)"
    },
    ...
  ],
  "lastModifiedDate": 1424801824343,
  "kinesisSyncRecordsURL": "S3Url",
  "payloadType" : "(S3Url|Inline)",
  "syncCount" : 1
 }

AWS CLI経由でコマンドを入力することで、Cognitoから正常にレコード入力されているかの確認を行うことが可能である。

AWS Kinesis(1)Kinesisの概要

Kinesisとは

Kinesisは、大量のデータを受け付けて配信先に順序通りに配信するバッファ機構である。Kinesisを利用することで高速かつ継続的にデータの取り込みと集約を行うことが可能であり、ログデータやセンサーデータなどの継続的に入力されるストリームの処理に適している。Kinesisにデータが入力されて取得できるようになるまでは1秒未満である。瞬時に処理を実行することが可能である。各クライアントから直接S3やDynamoDB, RDS等にデータ入力するのと比べ、より信頼性が高く安価なシステム構築が可能である。なお、データレコードは通常はストリームに追加されてから24 時間のみアクセス可能(オプションを使用すれば、最大168時間までアクセス可能)である。

Amazon Kinesis の主要なコンセプト - Amazon Kinesis

Kinesisに入力されたデータは、 Amazon Kinesis Client Library を用いたプログラム( Amazon Kinesis Application)をEC2上で動作させることで、加工や抽出・他のデータベースへの転送を行うことができる。Amazon Kinesis Applicationは、 DynamoDB上に制御テーブルを用意して、現在のチェックポイントを記録する。このため複数のプログラムを同時に実行させて、重複することなく並列でデータ処理を行うことが可能である。Application名は一意である必要があり、DynamoDBの制御名にも利用される。なお、「シャード」は、Kinesis上のデータ処理経路を、「プロデューサ」はKinesisへのデータの入力部分を、「コンシューマ」はKinesisからのデータ取得部分(役割)を指す。

レコードの順序

シャードイテレータは、シャード単位でデータを取得する。データの取得方法に関しては以下の4つが規定されている。

ShardIteratorType 取得方法
AT_SEQUENCE_NUMBER 特定のシーケンス番号からデータを取得
AFTER_SEQUENCE_NUMBER 特定のシーケンス番号の次のデータを取得
TRIM_HORIZON シャードの中でトリムされていない一番古いデータから取得
LATEST シャードの一番新しいデータから取得

レコードを取得(GetRecords)する際には、レコード毎に付与されたシーケンス番号がキーとなる。シーケンス番号は、シャード単位で管理されており、レコードを入力(PutRecords)した際にシャードごとにインクリメントされる。したがって、シーケンス番号順に取得するということは、すなわち各シャードに入力されたレコードを時系列順に取得するということと等しい。

ただし、シーケンス番号はシャード単位で管理されているため、1つのKinesis Client Libraryが、複数のシャードから同時にレコードを取得した場合などは、複数シャードのレコードが混在して取得されることから、取得データが時系列順に並ぶとは限らない。また、シャードへのデータ入力処理は並列で実施されるために、シャードへレコードが同時に入力された場合には、シーケンス番号が意図した通りにインクリメントされるとは限らない。レコード入力の際に、シャード内で厳密にシーケンス番号をインクリメントさせる場合には、SequenceNumberForOrderingパラメータを付与して、レコードの入力を行う。

時系列順に厳密にレコードを取得する必要がある場合には、プロデューサごとに毎回同じパーティションキーを使用し、かつSequenceNumberForOrderingパラメータを付与した上でレコードの入力を行う。これによって同一のプロデューサからのレコードは、同一のシャードに入力される。また、レコードを取得する際も、1シャードにつき1つのKinesis Client Libraryを用意することで、複数シャードのレコードを混在させることなく、レコードを取得・処理することが可能となる。ただし、各プロデューサごとに同じパーティションキーを使用する手法は、シャードごとにレコード量の偏りを生じる可能性もあるので注意が必要である。

課金

  • 無料枠は存在しない
  • シャード時間および PUT ペイロードユニットによる従量課金制

AWS CLIを用いたデータの取得

Kinesisの雰囲気をつかむためにawscliで操作する – Qiita

  • シャードイテレータの取得
    aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name STREAM_NAME
    {
        "ShardIterator": "AAAAAAAAAAEJa+Y5A5ZF3pdoe9Yfwefjiwfweifw9eOy5kguQL7aglWO4VI+Fcb/A9bzR/tKQBW8Yxco9RyOlRfs0q8RgFC0g6wHCnznhzDjpP9Xpfg6vuY/EPPHhYyxDdSKwePQjojgmTTqQlZzbkRHSEo/qSB+Nuqbg4asIsKiYwv96vvJoqxGkQi6RTN3DVf83Vf4nirQ0Sa4tg2A1sAyPfvr/r4etOX"
    }
  • シャードイテレータを使ったレコードの取得
    aws kinesis get-records --shard-iterator AAAAAAAAAAEJa+Y5A5ZF3pdoe9Yfwefjiwfweifw9eOy5kguQL7aglWO4VI+Fcb/A9bzR/tKQBW8Yxco9RyOlRfs0q8RgFC0g6wHCnznhzDjpP9Xpfg6vuY/EPPHhYyxDdSKwePQjojgmTTqQlZzbkRHSEo/qSB+Nuqbg4asIsKiYwv96vvJoqxGkQi6RTN3DVf83Vf4nirQ0Sa4tg2A1sAyPfvr/r4etOX
  • レコードの例
  • Data部分はBase64エンコードされる
  • レコードが存在しない場合は、Recordsが空の状態でレコードが返る
    {
      "Records":[ {
        "Data":"dGVzdGRhdGE=",
        "PartitionKey":"Batch-SagbXhGLwl”,
        "SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
      } ],
      "MillisBehindLatest":24000,
      "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
    }

文献