// Delete the stream
AmazonKinesis kinesis = new AmazonKinesisClient(credentials);
kinesis.setEndpoint("kinesis.ap-northeast-1.amazonaws.com");
// Delete the table
AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(credentialsProvider.getCredentials());
dynamoDB.setRegion(Region.getRegion(Regions.fromName("ap-northeast-1")));
private void processSingleRecord(Record record) {
// TODO Add your own record processing logic here
// レコードを取得
byte[] byteArray = new byte[record.getData().remaining()];
record.getData().get(byteArray);
String json = new String(byteArray);
// 以下に、JSONデータをパースするなどの処理を追記
//
//
Kinesis Client Libraryはバージョン1.5.0以降、IRecordProcessor インターフェイスのバージョン2を使用することが可能である。KCL for Java sample projectでは、現在もv1対応のサンプルプログラムのみ公開されており、v2対応のものは存在しない。
v1とv2では、以下のようにいくつかメソッドのパラメータに変更が生じている。
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
Worker worker = new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(kinesisClientLibConfiguration).build();
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
private InitializationInput kinesisInitializationInput;
@Override
public void initialize(InitializationInput initializationInput) {
LOG.info("Initializing record processor for shard: " + initializationInput.getShardId());
this.kinesisInitializationInput = initializationInput;
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOG.info("Shutting down record processor for shard: " + kinesisInitializationInput.getShardId());
// Important to checkpoint after reaching end of shard, so we can start processing data from child shards.
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
checkpoint(shutdownInput.getCheckpointer());
}
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
LOG.info("Processing " + processRecordsInput.getRecords().size() + " records from " + kinesisInitializationInput.getShardId());
// Process records and perform all exception handling.
processRecordsWithRetries(processRecordsInput.getRecords());
// Checkpoint once every checkpoint interval.
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(processRecordsInput.getCheckpointer());
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}