AWS CLI(1)初期設定とコマンド例

初期設定とリージョンの設定

AWS CLIは、AWSのサービスをコマンドラインインタフェースで操作することのできるコマンド群である。AWS CLIを使う前に、aws configureコマンドを使ってアクセスキーやデフォルトリージョンの設定を行っておく。

$ aws configure
AWS Access Key ID [None]: 
AWS Secret Access Key [None]: 
Default region name [None]: 
Default output format [None]: 

Access Keyは、IAMで設定取得したものを指定する。Default output formatには、通常はjsonを指定する。またこれらの設定情報は、ホームディレクトリ直下の.awsディレクトリ(~/.aws/config, ~/.aws/credentials)に保存される。

なおデフォルトリージョンを設定しない場合は、各コマンドにリージョンオプション(–region ap-northeast-1など)を付けることでリージョンを指定することが可能である。EC2で自作のAMIを作成し複数のリージョンで運用する場合などは、デフォルトリージョンを指定せずに、シェルスクリプト等で都度リージョンを指定する方が設定を変更せずに汎用的に使うことが可能である。

現在のリージョンを取得する

以下のコマンドを実行することで、EC2で現在のリージョンを取得することが可能である。

$ curl -s https://169.254.169.254/latest/meta-data/placement/availability-zone | sed -e 's/.$//g'

上記コマンドを用いることで、シェルスクリプトから動的にリージョンを指定することも可能である。

region=`curl -s https://169.254.169.254/latest/meta-data/placement/availability-zone | sed -e 's/.$//g'`
aws dynamodb scan --table-name TABLE_NAME --region ${region} > /tmp/TABLE_NAME.json

コマンド例

s3 – AWS CLI Command Reference

S3にファイルをコピーする

aws s3 cp /SRC_DIR/FILE_NAME s3://BUCKET_NAME/DIR

バケットを生成する

aws s3 mb s3://BUCKET_NAME

dynamodb – AWS CLI Command Reference

テーブルを作成する

aws dynamodb create-table --table-name TABLE_NAME --attribute-definitions AttributeName=ATTR_NAME,AttributeType=ATTR_TYPE --key-schema AttributeName=ATTR_NAME,KeyType=HASH --provisioned-throughput ReadCapacityUnits=NUM,WriteCapacityUnits=NUM

アイテムを書き込む

  • Number型の場合でも「””」で値を囲む必要がある
aws dynamodb put-item --table-name TABLE_NAME --item '{"ATTR_NAME": {"ATTR_TYPE": "ATTR_VAL"}, "ATTR_NAME": {"ATTR_TYPE": "ATTR_VAL"}}'

指定したテーブルをスキャンする

aws dynamodb scan --table-name TABLE_NAME

kinesis – AWS CLI Command Reference

ストリームを生成する

aws kinesis create-stream --stream-name STREAM_NAME --shard-count N 

ストリーム保持期間を変更する

  • Kinesis Recordは、通常24時間でデータが消去されてしまう。AWS CLIからこの保持期間を最大7日間(168時間)まで延長することができる
aws kinesis increase-stream-retention-period  --stream-name STREAM_NAME --retention-period-hours 168

ストリームの設定内容を確認する

aws kinesis describe-stream --stream-name STREAM_NAME

シャードイテレータを取得する

SHARD_IDは、上述のdescribe-streamコマンドから取得する

aws kinesis get-shard-iterator --shard-id SHARD_ID --shard-iterator-type TRIM_HORIZON --stream-name STREAM_NAME

レコードを取得する

SHARD_ITERATORは、上述のget-shard-iteratorコマンドから取得する

aws kinesis get-records --shard-iterator SHARD_ITERATOR

EL CapitanでXcode6.2が正常に動作しない

開発中のプログラムがSwift 1.0のままのプログラムなので未だにXcode6.2を使用しているのだが、IPA作成用にXcodeでArchiveビルドしようとすると

Main.storyboard: Exception while running ibtool: -[IBUIViewControllerAutolayoutGuide shouldBeArchived]: unrecognized selector sent to instance 0x7ff0c70a6f80

というエラーが。

しかも、XCode上にSchemeを表示させる画面がない。

Xcode6.2

どうやら、手元の端末をMacOS X 10.11 EL capitanにアップグレードしたのが原因だったようで、10.10 Yosemiteに戻すとXcode 6.2でも正常の動作に戻る。

というわけで、OSのアップグレードは計画的に。
MacOS X 10.11 EL capitanにしてからEclipseもなんかおかしい。

AWS DynamoDB(2)AWS SDK for Javaとアトミックカウンター

JavaによるDynamoDBへのアクセス

AWS SDK for Javaを用いることで、JavaプログラムからDynamoDBにアクセスすることができる。DynamoDBはHashKeyでインデックス化されており、Itemの追加や更新には当該テーブルのHashKeyの情報が必要となる。

DynamoDBインスタンスを取得

Credentialsやテーブル名を指定してDynamoDBインスタンスを指定する。リージョンを明示的に指定しないとデフォルトではバージニア州が指定されてしまう。通常credentialsは、ホームディレクトリ直下の「.aws」ディレクトリ(~/.aws/credentials)に置かれている。

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Table;

// ProfileCredentialsProvider
credentialsProvider = new ProfileCredentialsProvider();
try {
credentialsProvider.getCredentials();
} catch (Exception e) {
throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
+ "Please make sure that your credentials file is at the correct "
+ "location (/Users/hoge/.aws/credentials), and is in valid format.", e);
}

// AmazonDynamoDBClient
AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(credentialsProvider);
// リージョンの設定
dynamoDBClient.setRegion(Region.getRegion(Regions.fromName("ap-northeast-1")));
// DynamoDB
DynamoDB dynamoDB = new DynamoDB(dynamoDBClient);
// DynamoDB Table
Table dynamodbTable = dynamoDB.getTable(TABLE_NAME);

データの取得

getItemメソッドで、HashKeyを指定することでItemの取得が可能となる。HashKeyの「名称」と「値」を指定することで、Itemを取得できる。

dynamodbTable.getItem(HASH_KEY_NAME, HASH_KEY_VAL);

データの書き込み

DynamoDBは空文字を格納することができない。データを格納する前に空文字チェックを行っておく

データの挿入

putItemメソッドで、Itemの書き込みが可能となる。書き込みを行う際、HashKeyの「名称」と「値」の指定が必須である。なお、同じキーを持つ項目が存在している場合は、全ての値が更新(上書き)される。

Item item = new Item()
.withPrimaryKey(HASH_KEY_NAME, HASH_KEY_VAL)
.withString(key, val);
dynamodbTable.putItem(item);

データの更新

updateItemメソッドで、Itemを更新する。HashKeyの「名称」と「値」を指定することで、どのItemを更新するかが決定される。また更新内容は、SETやADD, REMOVEで始まる更新式によって指定する。なお、同じキーを持つ項目が存在している場合は、指定したAttributeのみ置換される。指定したキーが存在しない場合は、新規の項目が作成される。

// 更新するAttributeの名称(Key)
Map<String,String> resultExpressionAttributeNames = new HashMap<String,String>();
resultExpressionAttributeNames.put("#key", KEY_NAME);
// 更新するAttributeの値(Value)
Map<String,Object> resultExpressionAttributeValues = new HashMap<String,Object>();
resultExpressionAttributeValues.put(":val", VALUE);
// Attributeの更新
dynamodbTable.updateItem(
HASH_KEY_NAME, HASH_KEY_VALUE, // 更新対象のItem(HASH_KEY, VALUE)
"set #key = :val",
resultExpressionAttributeNames,
resultExpressionAttributeValues);

the provided key element does not match the schema javaというエラーが発生した場合は、誤ったHashKeyを指定している可能性があるので確認する。

アトミックカウンター

DynamoDBは、高い可用性やスケーラビリティ(BASE属性)を確保している一方で、厳密な一貫性や即時反映性(ACID属性)を諦めている。しかし、アトミックカウンター(一貫性のあるカウンタ)をサポートしているため、他の処理を妨げることなく並列に、一貫性のあるカウンタ処理を実施することは可能である。

アトミックカウンターをサポートとあるが、アトミックカウンターという特別に用意された機能が存在するわけでもない。updateItemメソッドを利用して以下のような記述をすると、アトミック性のあるカウンタを実装することができる。

Map<String,String> expressionAttributeNames = new HashMap<String,String>();
expressionAttributeNames.put("#key", kEY_NAME);
Map<String,Object> expressionAttributeValues = new HashMap<String,Object>();
expressionAttributeValues.put(":val", 1);

try {
dynamodbTable.updateItem(
HASH_KEY_NAME, HASH_KEY_VAL,
"ADD #key :val",
expressionAttributeNames,
expressionAttributeValues);
} catch (Exception e) {

}

更新式にて単純に1を加算する処理を行っているだけであるので、プログラムを書き換えることで加算もできれば減算もできる。2を足す、3を引くといったことも可能である。

dynamodbTable.updateItem(
HASH_KEY_NAME, HASH_KEY_VAL,
"SET #key = #key + :val",
expressionAttributeNames,
expressionAttributeValues);

としてもよいが、SETアクションの場合、アイテムが事前に存在しない場合にエラーとなるので、ADDアクションで実装したほうがよい。

DynamoDB操作の違い

なお、DynamoDBのそれぞれの操作の差異は以下の通り。

操作 アクション 既にアイテムが存在している場合の挙動
putItem 全ての属性を消去した上で新たに属性を追加する
updateItem SET 対象の属性のみ更新される
updateItem REMOVE 対象の属性のみ削除される
updateItem ADD 対象の属性が数値の場合は値に追加される、セットデータの場合は要素が追加される
updateItem DELETE 対象の属性のセットデータから要素が削除される

AWS Kinesis(3)リシャーディング

リシャーディングとは

Kinesisでは、シャードと呼ばれるストリーム単位で処理される。事前に設定するシャードの数が並列に処理を行う土管の数となる。Kinesisを設定する際にこのシャードの数を指定するが、実際の負荷に合わせてシャード数を変化させることができる。これがリシャーディングである。

Amazon Kinesis の主要なコンセプト - Amazon Kinesis

リシャーディングは単純に数の増減をするだけではなく、「どのシャードを分割するのか」や「どのシャードとどのシャードを分割するのか」を指定する必要がある。通常はどのシャードに負荷が掛かっているかをCloudWatchなどを用いて計測してリシャーディングを決定する。またシャードの結合は、隣接したシャード同士で無いと実現できない。

リシャーディング処理をコマンドライン上で実現するツールがGitHub上で公開されている。以下のようにパラメータを記述することで、リシャーディングを実行できる。

cd amazon-kinesis-scaling-utils
cd dist
java -cp KinesisScalingUtils.jar-complete.jar -Dstream-name=MyStream -Dscaling-action=scaleUp -Dcount=10 -Dregion=ap-northeast-1 ScalingClient

実行する際のオプションパラメータは以下の通り。

オプションパラメータ 内容
stream-name ストリーム名
scaling-action スケールアクション “scaleUp”, “scaleDown” or “resize”
count シャード数
pct 既存シャードに対する増加減数(%)
region リージョン
shard-id ターゲットとするシャードID

AWS Kinesis(2)Kinesis Client Library

Kinesis Client Libraryとは

Kinesis Client Libraryを用いて、Kinesis Applicationと呼ばれるプログラムを作成することができる。Kinesis Applicationは、Kinesisからデータを取得し、DynamoDBやRedshift、S3などにKinesisストリームを転送することが可能である。Kinesis Client Libraryが、複数のインスタンス間での負荷分散インスタンスの障害に対する応答処理済みのレコードのチェックポイント作成リシャーディングへの対応などを行うので、ユーザはデータ処理部分のみに注力することができる。

アプリケーションの状態の追跡

Kinesis Client Libraryは、アプリケーション毎にDynamoDBに特別なテーブルを作成して各アプリケーションの状態を追跡する。テーブルには、どのデータまでが読み込み済みであるかを示すチェックポイントの値などのアプリケーションの状態を示す情報が、シャード毎に記録される。テーブル名は、プログラム上で指定したアプリケーション名と同一である。1 秒あたりの読み込み 10 回、書き込み 10 回のスループットを持つテーブルが生成されるが、シャード数が多い場合などはスペックが足りなくなる場合があるので注意が必要である。

Kinesisステータステーブル

並列処理

Kinesis Client Libraryは、1つのWorkerで複数のシャード処理を実行することが可能である。1つのシャードを複数のWorkerで処理することはできない。シャード数よりも多い数のWorkerを立ち上げても処理は実行されないので注意が必要である。

設定値

Kinesisの制限事項

Kinesis Client Libraryのデフォルト値

  • getRecordsメソッドで取得するレコードは最大10000件
       /**
         * Max records to fetch from Kinesis in a single GetRecords call.
         */
        public static final int DEFAULT_MAX_RECORDS = 10000;
  • データ取得間隔は1秒
       /**
         * Idle time between record reads in milliseconds.
         */
        public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;

認証方法の設定

サンプルアプリケーションでは、認証情報をCredentialsから取得する実装となっているが、Kinesis Client LibraryをEC2インスタンス上で動作させる場合は、IAM Role(InstanceProfileCredentialsProvider)から取得する方法に変更することも可能である。

Kinesis Client Libraryを用いたデータ取得処理

Kinesis Client Libraryは複数の言語で提供されているが実体はJavaであり、MultiLangDaemon という多言語インターフェイスを通して他の言語でも機能が提供されている。今回は、AWS Toolkit for Eclipseに付属するサンプルアプリケーションとJavaライブラリを使用してKinesisからデータを取得する。

環境の構築

Kinesis Client LibraryをEC2で実行する場合は、EC2上にJavaの実行環境を用意する必要がある。

sudo yum -y install java-1.8.0-openjdk-devel
sudo alternatives --config java

また、Kinesis Client Libraryを実行するのに必要なパーミッションは以下の通り。

サービス 操作
kinesis DescribeStream, GetShardIterator, GetRecords
dynamodb CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem
cloudwatch PutMetricData

リージョンの設定

AWS SDK for Java および Kinesis Client Library は、リージョン指定が無いとデフォルトのバージニア州(us-east-1)が指定されてしまう。サンプルアプリケーションでは以下の設定を変更する必要がある。

kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
kinesisClientLibConfiguration.withRegionName("ap-northeast-1");
// Delete the stream
AmazonKinesis kinesis = new AmazonKinesisClient(credentials);
kinesis.setEndpoint("kinesis.ap-northeast-1.amazonaws.com");
// Delete the table
AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(credentialsProvider.getCredentials());
dynamoDB.setRegion(Region.getRegion(Regions.fromName("ap-northeast-1")));

最大取得数の設定

上述の通りデフォルトの最大取得数は10000である。これを変更するには以下を追記する。

kinesisClientLibConfiguration.withMaxRecords(500);

独自処理の追加

取得したデータをもとに独自処理を追加するためには、以下の箇所にデータを追記する。

    private void processSingleRecord(Record record) {
        // TODO Add your own record processing logic here

    	// レコードを取得
    	byte[] byteArray = new byte[record.getData().remaining()];
        record.getData().get(byteArray);
        String json = new String(byteArray);

    	// 以下に、JSONデータをパースするなどの処理を追記
    	//
    	//

JSONのパースには、Jacksonというライブラリが便利である。

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

JSON上のKeyを指定し、変数として抽出/処理することも可能である。

 ObjectMapper mapper = new ObjectMapper();
 	try {
		JsonNode node = mapper.readTree(json);

		// kinesisSyncRecordsを取得
		JsonNode kinesisSyncRecordsNode = node.get("kinesisSyncRecords");
		if(kinesisSyncRecordsNode.isArray()){
			// identityId値を取得
			String id = node.get("identityId").asText();
			// 配列をforループ
			for(final JsonNode objNode : kinesisSyncRecordsNode){
				// 処理
			}
		}
	} catch {
		// エラー処理
	}

Kinesis Client Library v2へ移行する

Kinesis Client Libraryはバージョン1.5.0以降、IRecordProcessor インターフェイスのバージョン2を使用することが可能である。KCL for Java sample projectでは、現在もv1対応のサンプルプログラムのみ公開されており、v2対応のものは存在しない。

v1とv2では、以下のようにいくつかメソッドのパラメータに変更が生じている。

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

Worker worker = new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(kinesisClientLibConfiguration).build();
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

private InitializationInput kinesisInitializationInput;

	@Override
	public void initialize(InitializationInput initializationInput) {
		LOG.info("Initializing record processor for shard: " + initializationInput.getShardId());
        this.kinesisInitializationInput = initializationInput;
	}

	@Override
	public void shutdown(ShutdownInput shutdownInput) {
        LOG.info("Shutting down record processor for shard: " + kinesisInitializationInput.getShardId());
        // Important to checkpoint after reaching end of shard, so we can start processing data from child shards.
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            checkpoint(shutdownInput.getCheckpointer());
        }
	}

	@Override
	public void processRecords(ProcessRecordsInput processRecordsInput) {
		LOG.info("Processing " + processRecordsInput.getRecords().size() + " records from " + kinesisInitializationInput.getShardId());

        // Process records and perform all exception handling.
        processRecordsWithRetries(processRecordsInput.getRecords());

        // Checkpoint once every checkpoint interval.
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(processRecordsInput.getCheckpointer());
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
	}

モニタリング

CloudWatchによってKinesis(PutRecords/GetRecords等)の状態監視、およびKinesis Client Library(ワーカーの挙動など)の状態監視を行うことができる。

コネクタライブラリ

コネクタライブラリと併用することで、Amazon DynamoDB、Amazon Redshift、Amazon S3、Amazon Elasticsearch Service などのサービスと連携してデータを受け渡すことができる。