Kinesis Client Libraryとは
Kinesis Client Libraryを用いて、Kinesis Applicationと呼ばれるプログラムを作成することができる。Kinesis Applicationは、Kinesisからデータを取得し、DynamoDBやRedshift、S3などにKinesisストリームを転送することが可能である。Kinesis Client Libraryが、複数のインスタンス間での負荷分散、インスタンスの障害に対する応答、処理済みのレコードのチェックポイント作成、リシャーディングへの対応などを行うので、ユーザはデータ処理部分のみに注力することができる。
アプリケーションの状態の追跡
Kinesis Client Libraryは、アプリケーション毎にDynamoDBに特別なテーブルを作成して各アプリケーションの状態を追跡する。テーブルには、どのデータまでが読み込み済みであるかを示すチェックポイントの値などのアプリケーションの状態を示す情報が、シャード毎に記録される。テーブル名は、プログラム上で指定したアプリケーション名と同一である。1 秒あたりの読み込み 10 回、書き込み 10 回のスループットを持つテーブルが生成されるが、シャード数が多い場合などはスペックが足りなくなる場合があるので注意が必要である。
並列処理
Kinesis Client Libraryは、1つのWorkerで複数のシャード処理を実行することが可能である。1つのシャードを複数のWorkerで処理することはできない。シャード数よりも多い数のWorkerを立ち上げても処理は実行されないので注意が必要である。
設定値
Kinesisの制限事項
Kinesis Client Libraryのデフォルト値
- getRecordsメソッドで取得するレコードは最大10000件
/**
* Max records to fetch from Kinesis in a single GetRecords call.
*/
public static final int DEFAULT_MAX_RECORDS = 10000;
/**
* Idle time between record reads in milliseconds.
*/
public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;
認証方法の設定
サンプルアプリケーションでは、認証情報をCredentialsから取得する実装となっているが、Kinesis Client LibraryをEC2インスタンス上で動作させる場合は、IAM Role(InstanceProfileCredentialsProvider)から取得する方法に変更することも可能である。
Kinesis Client Libraryを用いたデータ取得処理
Kinesis Client Libraryは複数の言語で提供されているが実体はJavaであり、MultiLangDaemon という多言語インターフェイスを通して他の言語でも機能が提供されている。今回は、AWS Toolkit for Eclipseに付属するサンプルアプリケーションとJavaライブラリを使用してKinesisからデータを取得する。
環境の構築
Kinesis Client LibraryをEC2で実行する場合は、EC2上にJavaの実行環境を用意する必要がある。
sudo yum -y install java-1.8.0-openjdk-devel
sudo alternatives --config java
また、Kinesis Client Libraryを実行するのに必要なパーミッションは以下の通り。
サービス |
操作 |
kinesis |
DescribeStream, GetShardIterator, GetRecords |
dynamodb |
CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem |
cloudwatch |
PutMetricData |
リージョンの設定
AWS SDK for Java および Kinesis Client Library は、リージョン指定が無いとデフォルトのバージニア州(us-east-1)が指定されてしまう。サンプルアプリケーションでは以下の設定を変更する必要がある。
kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
kinesisClientLibConfiguration.withRegionName("ap-northeast-1");
// Delete the stream
AmazonKinesis kinesis = new AmazonKinesisClient(credentials);
kinesis.setEndpoint("kinesis.ap-northeast-1.amazonaws.com");
// Delete the table
AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(credentialsProvider.getCredentials());
dynamoDB.setRegion(Region.getRegion(Regions.fromName("ap-northeast-1")));
最大取得数の設定
上述の通りデフォルトの最大取得数は10000である。これを変更するには以下を追記する。
kinesisClientLibConfiguration.withMaxRecords(500);
独自処理の追加
取得したデータをもとに独自処理を追加するためには、以下の箇所にデータを追記する。
private void processSingleRecord(Record record) {
// TODO Add your own record processing logic here
// レコードを取得
byte[] byteArray = new byte[record.getData().remaining()];
record.getData().get(byteArray);
String json = new String(byteArray);
// 以下に、JSONデータをパースするなどの処理を追記
//
//
JSONのパースには、Jacksonというライブラリが便利である。
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
JSON上のKeyを指定し、変数として抽出/処理することも可能である。
ObjectMapper mapper = new ObjectMapper();
try {
JsonNode node = mapper.readTree(json);
// kinesisSyncRecordsを取得
JsonNode kinesisSyncRecordsNode = node.get("kinesisSyncRecords");
if(kinesisSyncRecordsNode.isArray()){
// identityId値を取得
String id = node.get("identityId").asText();
// 配列をforループ
for(final JsonNode objNode : kinesisSyncRecordsNode){
// 処理
}
}
} catch {
// エラー処理
}
Kinesis Client Library v2へ移行する
Kinesis Client Libraryはバージョン1.5.0以降、IRecordProcessor インターフェイスのバージョン2を使用することが可能である。KCL for Java sample projectでは、現在もv1対応のサンプルプログラムのみ公開されており、v2対応のものは存在しない。
v1とv2では、以下のようにいくつかメソッドのパラメータに変更が生じている。
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
Worker worker = new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(kinesisClientLibConfiguration).build();
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
private InitializationInput kinesisInitializationInput;
@Override
public void initialize(InitializationInput initializationInput) {
LOG.info("Initializing record processor for shard: " + initializationInput.getShardId());
this.kinesisInitializationInput = initializationInput;
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOG.info("Shutting down record processor for shard: " + kinesisInitializationInput.getShardId());
// Important to checkpoint after reaching end of shard, so we can start processing data from child shards.
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
checkpoint(shutdownInput.getCheckpointer());
}
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
LOG.info("Processing " + processRecordsInput.getRecords().size() + " records from " + kinesisInitializationInput.getShardId());
// Process records and perform all exception handling.
processRecordsWithRetries(processRecordsInput.getRecords());
// Checkpoint once every checkpoint interval.
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(processRecordsInput.getCheckpointer());
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
モニタリング
CloudWatchによってKinesis(PutRecords/GetRecords等)の状態監視、およびKinesis Client Library(ワーカーの挙動など)の状態監視を行うことができる。
コネクタライブラリ
コネクタライブラリと併用することで、Amazon DynamoDB、Amazon Redshift、Amazon S3、Amazon Elasticsearch Service などのサービスと連携してデータを受け渡すことができる。