AWS Kinesis(1)Kinesisの概要

Kinesisとは

Kinesisは、大量のデータを受け付けて配信先に順序通りに配信するバッファ機構である。Kinesisを利用することで高速かつ継続的にデータの取り込みと集約を行うことが可能であり、ログデータやセンサーデータなどの継続的に入力されるストリームの処理に適している。Kinesisにデータが入力されて取得できるようになるまでは1秒未満である。瞬時に処理を実行することが可能である。各クライアントから直接S3やDynamoDB, RDS等にデータ入力するのと比べ、より信頼性が高く安価なシステム構築が可能である。なお、データレコードは通常はストリームに追加されてから24 時間のみアクセス可能(オプションを使用すれば、最大168時間までアクセス可能)である。

Amazon Kinesis の主要なコンセプト - Amazon Kinesis

Kinesisに入力されたデータは、 Amazon Kinesis Client Library を用いたプログラム( Amazon Kinesis Application)をEC2上で動作させることで、加工や抽出・他のデータベースへの転送を行うことができる。Amazon Kinesis Applicationは、 DynamoDB上に制御テーブルを用意して、現在のチェックポイントを記録する。このため複数のプログラムを同時に実行させて、重複することなく並列でデータ処理を行うことが可能である。Application名は一意である必要があり、DynamoDBの制御名にも利用される。なお、「シャード」は、Kinesis上のデータ処理経路を、「プロデューサ」はKinesisへのデータの入力部分を、「コンシューマ」はKinesisからのデータ取得部分(役割)を指す。

レコードの順序

シャードイテレータは、シャード単位でデータを取得する。データの取得方法に関しては以下の4つが規定されている。

ShardIteratorType 取得方法
AT_SEQUENCE_NUMBER 特定のシーケンス番号からデータを取得
AFTER_SEQUENCE_NUMBER 特定のシーケンス番号の次のデータを取得
TRIM_HORIZON シャードの中でトリムされていない一番古いデータから取得
LATEST シャードの一番新しいデータから取得

レコードを取得(GetRecords)する際には、レコード毎に付与されたシーケンス番号がキーとなる。シーケンス番号は、シャード単位で管理されており、レコードを入力(PutRecords)した際にシャードごとにインクリメントされる。したがって、シーケンス番号順に取得するということは、すなわち各シャードに入力されたレコードを時系列順に取得するということと等しい。

ただし、シーケンス番号はシャード単位で管理されているため、1つのKinesis Client Libraryが、複数のシャードから同時にレコードを取得した場合などは、複数シャードのレコードが混在して取得されることから、取得データが時系列順に並ぶとは限らない。また、シャードへのデータ入力処理は並列で実施されるために、シャードへレコードが同時に入力された場合には、シーケンス番号が意図した通りにインクリメントされるとは限らない。レコード入力の際に、シャード内で厳密にシーケンス番号をインクリメントさせる場合には、SequenceNumberForOrderingパラメータを付与して、レコードの入力を行う。

時系列順に厳密にレコードを取得する必要がある場合には、プロデューサごとに毎回同じパーティションキーを使用し、かつSequenceNumberForOrderingパラメータを付与した上でレコードの入力を行う。これによって同一のプロデューサからのレコードは、同一のシャードに入力される。また、レコードを取得する際も、1シャードにつき1つのKinesis Client Libraryを用意することで、複数シャードのレコードを混在させることなく、レコードを取得・処理することが可能となる。ただし、各プロデューサごとに同じパーティションキーを使用する手法は、シャードごとにレコード量の偏りを生じる可能性もあるので注意が必要である。

課金

  • 無料枠は存在しない
  • シャード時間および PUT ペイロードユニットによる従量課金制

AWS CLIを用いたデータの取得

Kinesisの雰囲気をつかむためにawscliで操作する – Qiita

  • シャードイテレータの取得
    aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name STREAM_NAME
    {
        "ShardIterator": "AAAAAAAAAAEJa+Y5A5ZF3pdoe9Yfwefjiwfweifw9eOy5kguQL7aglWO4VI+Fcb/A9bzR/tKQBW8Yxco9RyOlRfs0q8RgFC0g6wHCnznhzDjpP9Xpfg6vuY/EPPHhYyxDdSKwePQjojgmTTqQlZzbkRHSEo/qSB+Nuqbg4asIsKiYwv96vvJoqxGkQi6RTN3DVf83Vf4nirQ0Sa4tg2A1sAyPfvr/r4etOX"
    }
  • シャードイテレータを使ったレコードの取得
    aws kinesis get-records --shard-iterator AAAAAAAAAAEJa+Y5A5ZF3pdoe9Yfwefjiwfweifw9eOy5kguQL7aglWO4VI+Fcb/A9bzR/tKQBW8Yxco9RyOlRfs0q8RgFC0g6wHCnznhzDjpP9Xpfg6vuY/EPPHhYyxDdSKwePQjojgmTTqQlZzbkRHSEo/qSB+Nuqbg4asIsKiYwv96vvJoqxGkQi6RTN3DVf83Vf4nirQ0Sa4tg2A1sAyPfvr/r4etOX
  • レコードの例
  • Data部分はBase64エンコードされる
  • レコードが存在しない場合は、Recordsが空の状態でレコードが返る
    {
      "Records":[ {
        "Data":"dGVzdGRhdGE=",
        "PartitionKey":"Batch-SagbXhGLwl”,
        "SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
      } ],
      "MillisBehindLatest":24000,
      "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
    }

文献