Kinesisとは
Kinesisは、大量のデータを受け付けて配信先に順序通りに配信するバッファ機構である。Kinesisを利用することで高速かつ継続的にデータの取り込みと集約を行うことが可能であり、ログデータやセンサーデータなどの継続的に入力されるストリームの処理に適している。Kinesisにデータが入力されて取得できるようになるまでは1秒未満である。瞬時に処理を実行することが可能である。各クライアントから直接S3やDynamoDB, RDS等にデータ入力するのと比べ、より信頼性が高く安価なシステム構築が可能である。なお、データレコードは通常はストリームに追加されてから24 時間のみアクセス可能(オプションを使用すれば、最大168時間までアクセス可能)である。
Kinesisに入力されたデータは、 Amazon Kinesis Client Library を用いたプログラム( Amazon Kinesis Application)をEC2上で動作させることで、加工や抽出・他のデータベースへの転送を行うことができる。Amazon Kinesis Applicationは、 DynamoDB上に制御テーブルを用意して、現在のチェックポイントを記録する。このため複数のプログラムを同時に実行させて、重複することなく並列でデータ処理を行うことが可能である。Application名は一意である必要があり、DynamoDBの制御名にも利用される。なお、「シャード」は、Kinesis上のデータ処理経路を、「プロデューサ」はKinesisへのデータの入力部分を、「コンシューマ」はKinesisからのデータ取得部分(役割)を指す。
レコードの順序
シャードイテレータは、シャード単位でデータを取得する。データの取得方法に関しては以下の4つが規定されている。
ShardIteratorType |
取得方法 |
AT_SEQUENCE_NUMBER |
特定のシーケンス番号からデータを取得 |
AFTER_SEQUENCE_NUMBER |
特定のシーケンス番号の次のデータを取得 |
TRIM_HORIZON |
シャードの中でトリムされていない一番古いデータから取得 |
LATEST |
シャードの一番新しいデータから取得 |
レコードを取得(GetRecords)する際には、レコード毎に付与されたシーケンス番号がキーとなる。シーケンス番号は、シャード単位で管理されており、レコードを入力(PutRecords)した際にシャードごとにインクリメントされる。したがって、シーケンス番号順に取得するということは、すなわち各シャードに入力されたレコードを時系列順に取得するということと等しい。
ただし、シーケンス番号はシャード単位で管理されているため、1つのKinesis Client Libraryが、複数のシャードから同時にレコードを取得した場合などは、複数シャードのレコードが混在して取得されることから、取得データが時系列順に並ぶとは限らない。また、シャードへのデータ入力処理は並列で実施されるために、シャードへレコードが同時に入力された場合には、シーケンス番号が意図した通りにインクリメントされるとは限らない。レコード入力の際に、シャード内で厳密にシーケンス番号をインクリメントさせる場合には、SequenceNumberForOrderingパラメータを付与して、レコードの入力を行う。
時系列順に厳密にレコードを取得する必要がある場合には、プロデューサごとに毎回同じパーティションキーを使用し、かつSequenceNumberForOrderingパラメータを付与した上でレコードの入力を行う。これによって同一のプロデューサからのレコードは、同一のシャードに入力される。また、レコードを取得する際も、1シャードにつき1つのKinesis Client Libraryを用意することで、複数シャードのレコードを混在させることなく、レコードを取得・処理することが可能となる。ただし、各プロデューサごとに同じパーティションキーを使用する手法は、シャードごとにレコード量の偏りを生じる可能性もあるので注意が必要である。
課金
- 無料枠は存在しない
- シャード時間および PUT ペイロードユニットによる従量課金制
AWS CLIを用いたデータの取得
Kinesisの雰囲気をつかむためにawscliで操作する – Qiita
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name STREAM_NAME
{
"ShardIterator": "AAAAAAAAAAEJa+Y5A5ZF3pdoe9Yfwefjiwfweifw9eOy5kguQL7aglWO4VI+Fcb/A9bzR/tKQBW8Yxco9RyOlRfs0q8RgFC0g6wHCnznhzDjpP9Xpfg6vuY/EPPHhYyxDdSKwePQjojgmTTqQlZzbkRHSEo/qSB+Nuqbg4asIsKiYwv96vvJoqxGkQi6RTN3DVf83Vf4nirQ0Sa4tg2A1sAyPfvr/r4etOX"
}
aws kinesis get-records --shard-iterator AAAAAAAAAAEJa+Y5A5ZF3pdoe9Yfwefjiwfweifw9eOy5kguQL7aglWO4VI+Fcb/A9bzR/tKQBW8Yxco9RyOlRfs0q8RgFC0g6wHCnznhzDjpP9Xpfg6vuY/EPPHhYyxDdSKwePQjojgmTTqQlZzbkRHSEo/qSB+Nuqbg4asIsKiYwv96vvJoqxGkQi6RTN3DVf83Vf4nirQ0Sa4tg2A1sAyPfvr/r4etOX
- レコードの例
- Data部分はBase64エンコードされる
- レコードが存在しない場合は、Recordsが空の状態でレコードが返る
{
"Records":[ {
"Data":"dGVzdGRhdGE=",
"PartitionKey":"Batch-SagbXhGLwl”,
"SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
} ],
"MillisBehindLatest":24000,
"NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}
文献