AWS CloudWatch(1)CloudWatchとは何なのか

CloudWatchとは

CloudWatchは、AWSの各リソースを監視することのできるサービスである。AWSでシステム構築を行う場合、基本的にはこのCloudWawtchのみの監視で十分なことが多く、Zabbixなどの他の監視環境を構築する必要がない。フルマネージドのサービスの場合は自ら監視環境を構築することができないため、CloudWatchによる監視が必須となる。また、CloudWatch Logsを用いることで、各リソースのLogデータをCloudWatchで監視することが可能になる。CloudWatch Logsは、ログデータを無期限で保存する

CloudWatchは、NameSpaceと呼ばれるAWSサービス単位の項目とMetricsと呼ばれる監視内容から構成され、これらを組み合わせて指定することで任意の項目を確認することが可能となる。CloudWatch上のデータ(CloudWatch Logsのデータを除く)は最長2週間保存され、データの更新間隔は最短1分となっている。また、アクション機能を用いて、各アラームに対する通知やAutoRecovery, AutoScaleなどのアクションを規定することも可能である。

CloudWatch Logs Agentのインストール

EC2インスタンスにCloudWatch Logs Agentをインストールすることで、任意のログをCloudWachに送信することが可能となる。AmazonLinuxを使用する場合には、以下の手順でインストールする。

CloudWatch Logs Agentのインストール

CloudWatch Logs Agentはyumから簡単にインストールできる。

yum update -y
yum install -y awslogs

CloudWatch Logs Agentの設定

監視対象のログはconfファイルで設定する。

sudo vi /etc/awslogs/awslogs.conf

標準で/var/log/messagesの情報は送られる設定になるようである。
その下に下記のように設定を追記することで、任意のログを送ることが可能となる。

[/var/log/messages]
datetime_format = %b %d %H:%M:%S
file = /var/log/messages
buffer_duration = 5000
log_stream_name = {instance_id}
initial_position = start_of_file
log_group_name = /var/log/messages

[test-app-log]
file = /home/ec2-user/test-app/process_log
buffer_duration = 5000
log_stream_name = {instance_id}
initial_position = start_of_file
log_group_name = /home/ec2-user/test-app/process_log

CloudWatch Logsのログデータをエクスポートする

CloudWatch Logsに蓄積したログデータは、S3に一括エクスポートすることができる。詳しくは、「CloudWatch コンソールを使用してログデータを Amazon S3 にエクスポートする」を参照のこと。

AWS Kinesis(5)Kinesis Client Libraryでマルチスレッド処理を行う

マルチスレッド処理

AWSが公開しているKinesis Client Library(KCL)のサンプルプログラムでは、取得した各レコードをシングルスレッドで順次処理している。しかしこれでは、前のRecord処理が終了しないと次のRecord処理が実行できないため、Kinesis Client LibraryのDEFAULT_MAX_RECORDの値を上げたとしても性能が十分出ない。

そこで、Record処理を単純にRunnaleなどでマルチスレッド化してしまうとスレッド数が制御できなくなり、例えばKinesis Recordから取得したデータをDynamoDBに順次書き込むという制御を記述していた場合は、

com.amazonaws.http.AmazonHttpClient executeHelper
INFO: Unable to execute HTTP request: Timeout waiting for connection from pool
org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool

など、DynamoDBにアクセスするためのHTTPのリソースが枯渇してエラーが発生し、Record処理に漏れが生じてしまう。そこで、ExecutorServiceを用いてスレッド数を制御しながらマルチスレッド化する。

    /**
     * Process records performing retries as needed. Skip "poison pill" records.
     *
     * @param records Data records to be processed.
     */
    private void processRecordsWithRetries(List<Record> records) {
        ExecutorService exec = Executors.newFixedThreadPool(NUM);
        try {	        
            for (Record record : records) {
                boolean processedSuccessfully = false;
                for (int i = 0; i < NUM_RETRIES; i++) {
                    try {
                        // スレッドタスクを実行
                        exec.submit(new My_Method(this, record));
                        // 略...
                    }
                }
            }
        } finally {
            // スレッドタスクを終了
            exec.shutdown();
            if(!exec.awaitTermination(60, TimeUnit.SECONDS)){
                exec.shutdownNow();
            }
        }
    }

ExecutorServiceは、スレッドプールを用いて、マルチスレッドタスクを管理しながら実行できる仕組みで、submit()によりタスクが生成されて、ブロッキングキューに挿入され、shutdown()もしくはshutdownNow()メソッドにより処理を終了させることができる。ExecutorServiceを用いると生成するスレッド数を指定できることから、無尽蔵にスレッドが生成される心配がない。

ExecutorServiceは、終了処理を必ず明示的に実装しておく必要があるサービスである。生成済みのタスクは、shutdown()メソッド実行後も処理が継続されるが、shutdownNow()メソッドの場合は、強制的に処理がキャンセルされる。すなわち、shutdown()メソッド実行を実行したあとも、処理が継続したままである場合があることに注意が必要である。したがって、上記のように、shutdown()メソッドを実行した後に、awaitTermination()メソッドによりタイムアウトの時間を設定しておき、この時間を超えても処理が継続している場合には、shutdownNow()メソッドで強制的に処理を終了させるという実装にすることが望ましい。

ExecutorServiceで設定したスレッド数(NUM)が多すぎると以下のエラーが発生するため、スレッド数の上限を設定する際は注意が必要である。

java.lang.OutOfMemoryError: unable to create new native thread 

プログラムがどれほどスレッドを消費しているかは、以下のコマンドで確認が可能である。

ls -l /proc/[プログラムのプロセスNo.]/task | wc -l

また、Linuxの最大スレッド数は、

cat /proc/sys/kernel/threads-max

ユーザ1人あたりの制限は、

ulimit -a

で確認することが可能である。

AWS EC2(2)使用する上で注意すること

EC2は、AWSマネージメントコンソールからクリック1つでインスタンスを起動できる、非常に便利なクラウドサービスである。一方でとても簡単に利用できることから、全ての制限が取っ払われ、自分の思い通りのまま自由にリソースを使用できるサービスだというような錯覚に陥りがちだ。EC2も元を辿ればデータセンターの中にある物理マシンである。使用する際にはいくつかの注意すべき点が存在する。

サービスで使用する際にT2インスタンスは使わない

T2インスタンスは、AWSの公式説明によると「ベースラインを超えてバーストする能力がある CPU パフォーマンスのベースラインを提供する、バーストパフォーマンスインスタンスです」とある。しかし実際は、データセンター内の同一物理サーバ上に収納されている他のインスタンスが実験用途等で使用され、バースト上の負荷がそのインスタンスに掛かった場合は、自分のインスタンスにも影響が出る可能性がある。したがって商用サービス等の安定した運用が求められる利用方法の場合にT2インスタンスを使用することはおすすめしない。

バックアップを用意しよう

インスタンスが落ちたり、リージョンごとサービスダウンする可能性もゼロではない。24時間365日稼動しているように見えるクラウドサービスにもダウンタイムは確実に存在する。少なくともアベイラビリティゾーンを分ける、可能であれば他のリージョンにもバックアップを取るなどの障害対策はしておこう。

古いインスタンスは使わない

仮想環境、仮想OSといえど元はデータセンターにある物理サーバである。古くなれば電源やHDDなどが故障するリスクも高まる。実際にハードウェアに異常が起こり、EC2のインスタンスが急に応答しない、インスタンスが落ちるなどの現象が発生することがあるようだ。したがって、古いインスタンスはなるべく使わず、M3を使うぐらいならM4を、C3を使うくらいならC4を使おう。ちなみにインスタンスが落ちた場合、Auto Recovery機能をONにしていれば、自動で再起動してくるようである。

リソースは枯渇する

仮想環境、仮想OSといえど元はデータセンターにある物理サーバである。用意しているリソース量が全て使用されればリソースは枯渇する。実際に東京リージョンでは、年末年始などの多くの企業がイベント利用を行うシーズンに、特定のインスタンスタイプが起動できないことがあるようだ。またリソースの枯渇だけでなく、メンテナンスが原因で新たなリソースが確保できないということもあり得る。どうしてもリソースの確保が必要であれば事前に確保しておき、実際に使用するまでインスタンスをStopしておくなどの対応が必要である。リザーブドインスタンスの利用も検討したい。

バージニアリージョンは諸刃の剣

バージニアリージョンは、AWS創業の地。新しいサービスが一番に投入されるリージョンでもあり、リソース量も他のリージョンと桁違いに大きい。しかし一方で施設の老朽化が進んでいるからか異常なほどサービスダウンが多い。日本でEC2を使ったサービスを展開をする際に、わざわざバージニアリージョンを使用するメリットはあまり無いが、リソース量が大きいためバックアップリージョンとして利用する価値はあるだろう。レイテンシーを気にするのであれば、リソース量は劣るが物理距離が近い、シンガポール等のアジアパシフィック地域のリージョンも候補に入れたい。

上限引き上げはお早めに

AWSのサービスには、それぞれサービス制限(スロットリング)が適用されており、例えば、EC2は初期状態では最大20台までしかインスタンスを起動できない設定となっている。上限値以上にリソースを使用する場合はサポートプランに加入の上、サポートに必要なリソース数とその理由を付けて上限値の引き上げや撤廃を申請する必要があるが、上限値の変更が必要な理由が不明確であったり必要以上のリソースを要求すると断られる場合もあるので、少なくとも処理に3営業日以上掛かることを見越して、早めに上限引き上げを申請しておいたほうがよいだろう。

Auto Scaleは過信しない

Auto Scaleはリソース量に合わせてインスタンス量を自動で調整してくれるとても便利なサービスである。しかし、閾値を超えたことを検知して新たなインスタンスを立ち上げるまで数分を要する。したがってバースト的な負荷には対応できない。瞬間的なアクセスが予想されるのであれば、Auto Scaleは使わずにあらかじめ手動で、一定数のインスタンスを立ち上げておこう。

CPU使用率が定常的に30%を超えるのであれば代替策を考えよう

CPU使用率が定常的に30%を超えるのであればもう1つ上のインスタンスに変更するか、同じインスタンスをもう1つ用意しよう。

性能の良いインスタンスで台数少なく運用する方がラク

どのような処理をするかにもよるが、低い性能のインスタンスを大量に並べるよりかは、性能の良いインスタンスを台数少なく並べたほうが運用がラクなことが多い。場合によるけどさ。

EBSのボリュームサイズによってI/O性能は変化する

ディスク容量あまり使わないからと最小限のディスク容量しか確保していない状態で、大量のディスクアクセスが発生した場合は皆が悲しむことになる。汎用(SSD) ボリュームのパフォーマンスはボリュームサイズによって変化するように設計されており、ボリュームサイズが大きければ大きいほど、良いI/O性能が与えられる。一定量のディスクアクセスが発生する可能性がある場合は、1TBなど大きめのボリュームサイズを割り当てておくほうがよい。

とりあえずCPU利用率を見ていればOK

CloudWatchはいろいろな項目があるので、どの項目を確認すればEC2が正常であるのか判断に困るかもしれない。そんなときは取り敢えずはCPU使用率だけでも見ておこう。

そのほかに注意すること

  • インスタンスを停止状態から実行状態に移行するたびに 1 時間分のインスタンス時間が課金される。
  • インスタンス起動後に設定変更できない項目が存在する。VPCやサブネット、Roleなどは後から変更できない。Roleは使う予定がなくても取り敢えず作成しておいたほうが良い。(参考:EC2起動後に、後からできること・できないこと

AWS Cognito(5)スロットリング

スロットリングとは

AWSは各サービスにサービス制限(スロットリングを行っており、各アカウントごとに使えるリソースの上限が設定されている。例えば、EC2は初期状態では最大20台までしかインスタンスを起動できない設定となっている。フルマネージドのサービスに対してもこれらの制限が設定されており、「フルマネージド」という名称でありながら負荷に合わせて際限なくスケールするわけではない。また、サービスによっては、サービス制限一覧にこの上限値が明記されていないものもある。

上限値以上にリソースを使用する場合はサポートプランに加入の上、サポートに必要なリソース数とその理由を付けて上限値の引き上げや撤廃を申請する必要がある。上限値の変更が必要な理由が不明確であったり必要以上のリソースを要求すると、要望が叶えられないこともあるので注意が必要である。理由を明確にした上で申請する必要がある。

なお基本的にAWSのマネージドサービスは、「定常的な利用」や「一時的なサービスを簡単に作成するため」に提供するシステムであるという考え方のようで、バースト的なアクセスや複雑な処理、定常的に大きな負荷が掛かる処理については、EC2上にシステムを構築して利用すべきであるというのがAmazonの方針のようである。大きな負荷の掛かる処理の場合は、上限引き上げ申請するだけでなく、EC2を用いて実現できないかについても検討すべきであろう。

Cognitoのスロットリング

AWSのサービスは日々拡張されていっているので、スロットリングの値は随時変更されている可能性がある。その前提のもと現時点で、

  • Cognito Identity: 数百/毎秒程度
  • Cognito Sync : 数千/毎秒程度

でスロットリングされており、それ以上の負荷が掛かる可能性があったりそれ以上のリソースを必要とする場合は、サポートに上限引き上げの申請を行う必要があるようだ。また、申請を行ったとしても標準値の数倍程度までしか拡張できないようである。Cognitoは昨年の9月に東京リージョンにきたばかりのサービスであるので、用意されているリソースにも限りがあるのかもしれない。今後のリソース拡張に期待したい。

AWS Lambda(2)LambdaでKinesis Recordを取得する

Kinesis Process Records

Kinesis Event 発生毎にLambdaを実行し、Kinesis Recordを取得することが可能である。AWS マネジメントコンソールから関数を作成する場合は、kinesis-process-recordを選択。

Kinesis関数の設定

Stream名など必要事項を記入しプログラムをアップロードすることでLambdaを実行することが可能となる。

lambda_kinesis_function_2

Eclipseで開発する場合は、EclipseのAWSプラグインから直接アップロードや設定を行うことも可能である。Eclipseプロジェクトを右クリックして、「Amazon Web Services」から「Upload function to AWS Lambda…」を選択、「アップロードするリージョン」と「関数名」を指定し、

Kinesis関数の設定

「Description」や「Role」、プログラム一式を格納する「S3バケット」を指定して「Finish」を押すとEclipseで作成したLambda関数がAWS上にアップロードされる。

Eclipse設定

JavaによるLambda関数の実装

RequestHandler<KinesisEvent, Object>を実装することで、Kinesisからデータを取得し処理を行うことが可能となる。1回の処理で複数のKinesis Recordを取得するのでfor文で廻してそれぞれのRecordの値を取得することができる。Kinesis Recordには、ApproximateArrivalTimestampというRecord入力時に自動付与されるタイムスタンプが存在し、この値を取得するためのgetApproximateArrivalTimestamp()というメソッドも用意されているが、現時点では値の取得に対応していおらず、このメソッドを実行してもnullしか返えらないので注意が必要である。

public class KinesisEventHandler implements RequestHandler<KinesisEvent, Object> {

    @Override
    public Object handleRequest(KinesisEvent input, Context context) {
    	    	
        context.getLogger().log("Input: " + input);
        
        for(KinesisEventRecord rec : input.getRecords()) {
        	// Recordを取得
        	Record record = rec.getKinesis();
        	// JSONを取得
        	byte[] byteArray = new byte[record.getData().remaining()];
        	record.getData().get(byteArray);
        	String json = new String(byteArray);
        	// タイムスタンプが取得できない
        	String timestamp = rec.getKinesis().getApproximateArrivalTimestamp();
        }
    }
}

Lambdaのスロットリング

Lambdaは、1つの関数につき1秒あたり同時処理数100でスロットリングされている。また、上限値の引き上げを申請しても1秒あたり同時接続数1000程度が限界のようである。したがって、Kinesisに大量のRecordが入力された場合はLambdaの上限を超えてしまいRecordを取りこぼしてしまう可能性がある。Kinesis Recordを確実に取得し処理する必要がある場合は、EC2上でKinesis Client Libraryを用いてKinesis Appを実装する必要がある。

AWS Kinesis(4)Kinesis Producer Library

Kinesis Producer Libraryとは

Kinesis Client Libraryに付随しているKinesis Producer Library(KPL)を用いることで、Kinesisに容易にデータを投入することが可能となる。KPLを用いることで自動的に複数シャードにデータを投入したり、ユーザレコードを集約してスループットを改善する等が可能となる。複数のレコードを単一のKinesis Recordに集約することで、API呼び出し時に取得できるデータ量が増大させることが可能になるため、より少ないシャード数の用意でデータ量を処理することが可能となる。

またKPLは独立したプロセスで動作しているため、KPLがクラッシュした場合でも他の機能の動作を継続させることが可能である。

Kinesis Producer Libraryを用いたデータ取得処理

パーティションキー

Kinesisへデータを投入する際には、ストリーム名の指定パーティションキーの指定が必要である。パーティションキーは、複数のシャードの中からストリームを追加するシャードを選択するために使用するキーで、均等にシャードを選択するためにシャード数より十分大きい数を用意する必要がある。AWS SDKに付属しているサンプルプログラムでは、現在時刻(Current Millis)をパーティションキーとして使用している。

long createTime = System.currentTimeMillis();
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", createTime));

リージョンの設定

KPL は Kinesis Client Library と同様に、リージョン指定が無いとデフォルトのバージニア州(us-east-1)が指定されてしまう。サンプルアプリケーションでは以下の設定を変更する必要がある。

kinesis = new AmazonKinesisClient(credentials);
kinesis.withRegion(Region.getRegion(Regions.fromName(REGION_NAME)));

なお、KinesisProducerConfigurationはProperyファイルからの設定の読み込みにも対応しているため、Propetyファイルからリージョンを指定することも可能である。

Kinesisへの書き込み

Kinesisへのレコードの書き込みは、Kinesis Producer Libraryを用いる方法の他に以下のような手段もある。

  • AWS SDK (Kinesis Stream API)の使用
  • Amazon Kinesis エージェントの使用
  • fluent-plugin-kinesisの使用

AWS DynamoDB(2)AWS SDK for Javaとアトミックカウンター

JavaによるDynamoDBへのアクセス

AWS SDK for Javaを用いることで、JavaプログラムからDynamoDBにアクセスすることができる。DynamoDBはHashKeyでインデックス化されており、Itemの追加や更新には当該テーブルのHashKeyの情報が必要となる。

DynamoDBインスタンスを取得

Credentialsやテーブル名を指定してDynamoDBインスタンスを指定する。リージョンを明示的に指定しないとデフォルトではバージニア州が指定されてしまう。通常credentialsは、ホームディレクトリ直下の「.aws」ディレクトリ(~/.aws/credentials)に置かれている。

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Table;

// ProfileCredentialsProvider
credentialsProvider = new ProfileCredentialsProvider();
try {
    credentialsProvider.getCredentials();
} catch (Exception e) {
    throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
                    + "Please make sure that your credentials file is at the correct "
                    + "location (/Users/hoge/.aws/credentials), and is in valid format.", e);
}

// AmazonDynamoDBClient
AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(credentialsProvider);
// リージョンの設定
dynamoDBClient.setRegion(Region.getRegion(Regions.fromName("ap-northeast-1")));
// DynamoDB
DynamoDB dynamoDB = new DynamoDB(dynamoDBClient);
// DynamoDB Table
Table dynamodbTable = dynamoDB.getTable(TABLE_NAME);

データの取得

getItemメソッドで、HashKeyを指定することでItemの取得が可能となる。HashKeyの「名称」と「値」を指定することで、Itemを取得できる。

dynamodbTable.getItem(HASH_KEY_NAME, HASH_KEY_VAL);

データの書き込み

DynamoDBは空文字を格納することができない。データを格納する前に空文字チェックを行っておく

データの挿入

putItemメソッドで、Itemの書き込みが可能となる。書き込みを行う際、HashKeyの「名称」と「値」の指定が必須である。なお、同じキーを持つ項目が存在している場合は、全ての値が更新(上書き)される。

Item item = new Item()
		.withPrimaryKey(HASH_KEY_NAME, HASH_KEY_VAL)
		.withString(key, val);
dynamodbTable.putItem(item);

データの更新

updateItemメソッドで、Itemを更新する。HashKeyの「名称」と「値」を指定することで、どのItemを更新するかが決定される。また更新内容は、SETやADD, REMOVEで始まる更新式によって指定する。なお、同じキーを持つ項目が存在している場合は、指定したAttributeのみ置換される。指定したキーが存在しない場合は、新規の項目が作成される。

// 更新するAttributeの名称(Key)
Map<String,String> resultExpressionAttributeNames = new HashMap<String,String>();
resultExpressionAttributeNames.put("#key", KEY_NAME);
// 更新するAttributeの値(Value)
Map<String,Object> resultExpressionAttributeValues = new HashMap<String,Object>();
resultExpressionAttributeValues.put(":val", VALUE);
// Attributeの更新
dynamodbTable.updateItem(
	HASH_KEY_NAME, HASH_KEY_VALUE, // 更新対象のItem(HASH_KEY, VALUE)
	"set #key = :val",
resultExpressionAttributeNames,
resultExpressionAttributeValues);

the provided key element does not match the schema javaというエラーが発生した場合は、誤ったHashKeyを指定している可能性があるので確認する。

アトミックカウンター

DynamoDBは、高い可用性やスケーラビリティ(BASE属性)を確保している一方で、厳密な一貫性や即時反映性(ACID属性)を諦めている。しかし、アトミックカウンター(一貫性のあるカウンタ)をサポートしているため、他の処理を妨げることなく並列に、一貫性のあるカウンタ処理を実施することは可能である。

アトミックカウンターをサポートとあるが、アトミックカウンターという特別に用意された機能が存在するわけでもない。updateItemメソッドを利用して以下のような記述をすると、アトミック性のあるカウンタを実装することができる。

    	Map<String,String> expressionAttributeNames = new HashMap<String,String>();
		expressionAttributeNames.put("#key", kEY_NAME);
		Map<String,Object> expressionAttributeValues = new HashMap<String,Object>();
		expressionAttributeValues.put(":val", 1);
		
		try {
			dynamodbTable.updateItem(
				    HASH_KEY_NAME, HASH_KEY_VAL,  
				    "ADD #key :val",
				    expressionAttributeNames, 
				    expressionAttributeValues);
		} catch (Exception e) {
			
		}

更新式にて単純に1を加算する処理を行っているだけであるので、プログラムを書き換えることで加算もできれば減算もできる。2を足す、3を引くといったことも可能である。

    	dynamodbTable.updateItem(
				    HASH_KEY_NAME, HASH_KEY_VAL,  
				    "SET #key = #key + :val",
				    expressionAttributeNames, 
				    expressionAttributeValues);

としてもよいが、SETアクションの場合、アイテムが事前に存在しない場合にエラーとなるので、ADDアクションで実装したほうがよい。

DynamoDB操作の違い

なお、DynamoDBのそれぞれの操作の差異は以下の通り。

操作 アクション 既にアイテムが存在している場合の挙動
putItem 全ての属性を消去した上で新たに属性を追加する
updateItem SET 対象の属性のみ更新される
updateItem REMOVE 対象の属性のみ削除される
updateItem ADD 対象の属性が数値の場合は値に追加される、セットデータの場合は要素が追加される
updateItem DELETE 対象の属性のセットデータから要素が削除される

AWS Kinesis(3)リシャーディング

リシャーディングとは

Kinesisでは、シャードと呼ばれるストリーム単位で処理される。事前に設定するシャードの数が並列に処理を行う土管の数となる。Kinesisを設定する際にこのシャードの数を指定するが、実際の負荷に合わせてシャード数を変化させることができる。これがリシャーディングである。

Amazon Kinesis の主要なコンセプト - Amazon Kinesis

リシャーディングは単純に数の増減をするだけではなく、「どのシャードを分割するのか」や「どのシャードとどのシャードを分割するのか」を指定する必要がある。通常はどのシャードに負荷が掛かっているかをCloudWatchなどを用いて計測してリシャーディングを決定する。またシャードの結合は、隣接したシャード同士で無いと実現できない。

リシャーディング処理をコマンドライン上で実現するツールがGitHub上で公開されている。以下のようにパラメータを記述することで、リシャーディングを実行できる。

cd amazon-kinesis-scaling-utils
cd dist
java -cp KinesisScalingUtils.jar-complete.jar -Dstream-name=MyStream -Dscaling-action=scaleUp -Dcount=10 -Dregion=ap-northeast-1 ScalingClient

実行する際のオプションパラメータは以下の通り。

オプションパラメータ 内容
stream-name ストリーム名
scaling-action スケールアクション “scaleUp”, “scaleDown” or “resize”
count シャード数
pct 既存シャードに対する増加減数(%)
region リージョン
shard-id ターゲットとするシャードID

AWS Kinesis(2)Kinesis Client Library

Kinesis Client Libraryとは

Kinesis Client Libraryを用いて、Kinesis Applicationと呼ばれるプログラムを作成することができる。Kinesis Applicationは、Kinesisからデータを取得し、DynamoDBやRedshift、S3などにKinesisストリームを転送することが可能である。Kinesis Client Libraryが、複数のインスタンス間での負荷分散インスタンスの障害に対する応答処理済みのレコードのチェックポイント作成リシャーディングへの対応などを行うので、ユーザはデータ処理部分のみに注力することができる。

アプリケーションの状態の追跡

Kinesis Client Libraryは、アプリケーション毎にDynamoDBに特別なテーブルを作成して各アプリケーションの状態を追跡する。テーブルには、どのデータまでが読み込み済みであるかを示すチェックポイントの値などのアプリケーションの状態を示す情報が、シャード毎に記録される。テーブル名は、プログラム上で指定したアプリケーション名と同一である。1 秒あたりの読み込み 10 回、書き込み 10 回のスループットを持つテーブルが生成されるが、シャード数が多い場合などはスペックが足りなくなる場合があるので注意が必要である。

Kinesisステータステーブル

並列処理

Kinesis Client Libraryは、1つのWorkerで複数のシャード処理を実行することが可能である。1つのシャードを複数のWorkerで処理することはできない。シャード数よりも多い数のWorkerを立ち上げても処理は実行されないので注意が必要である。

設定値

Kinesisの制限事項

Kinesis Client Libraryのデフォルト値

  • getRecordsメソッドで取得するレコードは最大10000件
        /**
         * Max records to fetch from Kinesis in a single GetRecords call.
         */
        public static final int DEFAULT_MAX_RECORDS = 10000;
    

    src/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java Line:48

  • データ取得間隔は1秒

        /**
         * Idle time between record reads in milliseconds.
         */
        public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;
    

    src/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java Line:53

認証方法の設定

Kinesis Client Libraryを用いたデータ取得処理

Kinesis Client Libraryは複数の言語で提供されているが実体はJavaであり、MultiLangDaemon という多言語インターフェイスを通して他の言語でも機能が提供されている。今回は、AWS Toolkit for Eclipseに付属するサンプルアプリケーションとJavaライブラリを使用してKinesisからデータを取得する。

環境の構築

Kinesis Client LibraryをEC2で実行する場合は、EC2上にJavaの実行環境を用意する必要がある。

sudo yum -y install java-1.8.0-openjdk-devel
sudo alternatives --config java

また、Kinesis Client Libraryを実行するのに必要なパーミッションは以下の通り。

サービス 操作
kinesis DescribeStream, GetShardIterator, GetRecords
dynamodb CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem
cloudwatch PutMetricData

リージョンの設定

AWS SDK for Java および Kinesis Client Library は、リージョン指定が無いとデフォルトのバージニア州(us-east-1)が指定されてしまう。サンプルアプリケーションでは以下の設定を変更する必要がある。

kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
kinesisClientLibConfiguration.withRegionName("ap-northeast-1");

AmazonKinesisApplicationSample.java Line:94

// Delete the stream
AmazonKinesis kinesis = new AmazonKinesisClient(credentials);
kinesis.setEndpoint("kinesis.ap-northeast-1.amazonaws.com");

AmazonKinesisApplicationSample.java Line:119

// Delete the table
AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(credentialsProvider.getCredentials());
dynamoDB.setRegion(Region.getRegion(Regions.fromName("ap-northeast-1")));

AmazonKinesisApplicationSample.java Line:129

最大取得数の設定

上述の通りデフォルトの最大取得数は10000である。これを変更するには以下を追記する。

kinesisClientLibConfiguration.withMaxRecords(500);

AmazonKinesisApplicationSample.java Line:95

独自処理の追加

取得したデータをもとに独自処理を追加するためには、以下の箇所にデータを追記する。

    private void processSingleRecord(Record record) {
        // TODO Add your own record processing logic here
        
    	// レコードを取得
    	byte[] byteArray = new byte[record.getData().remaining()];
        record.getData().get(byteArray);
        String json = new String(byteArray);

    	// 以下に、JSONデータをパースするなどの処理を追記
    	// 
    	// 

AmazonKinesisApplicationSampleRecordProcessor.java Line:117

JSONのパースには、Jacksonというライブラリが便利である。

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

JSON上のKeyを指定し、変数として抽出/処理することも可能である。

 ObjectMapper mapper = new ObjectMapper();
 	try {
		JsonNode node = mapper.readTree(json);
			
		// kinesisSyncRecordsを取得
		JsonNode kinesisSyncRecordsNode = node.get("kinesisSyncRecords");
		if(kinesisSyncRecordsNode.isArray()){
			// identityId値を取得
			String id = node.get("identityId").asText();	
			// 配列をforループ
			for(final JsonNode objNode : kinesisSyncRecordsNode){
				// 処理
			}
		}
	} catch {
		// エラー処理
	}

Kinesis Client Library v2へ移行する

Kinesis Client Libraryはバージョン1.5.0以降、IRecordProcessor インターフェイスのバージョン2を使用することが可能である。KCL for Java sample projectでは、現在もv1対応のサンプルプログラムのみ公開されており、v2対応のものは存在しない。

v1とv2では、以下のようにいくつかメソッドのパラメータに変更が生じている。

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

Worker worker = new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(kinesisClientLibConfiguration).build();
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

private InitializationInput kinesisInitializationInput;

	@Override
	public void initialize(InitializationInput initializationInput) {
		LOG.info("Initializing record processor for shard: " + initializationInput.getShardId());
        this.kinesisInitializationInput = initializationInput;
	}

	@Override
	public void shutdown(ShutdownInput shutdownInput) {
        LOG.info("Shutting down record processor for shard: " + kinesisInitializationInput.getShardId());
        // Important to checkpoint after reaching end of shard, so we can start processing data from child shards.
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            checkpoint(shutdownInput.getCheckpointer());
        }
	}

	@Override
	public void processRecords(ProcessRecordsInput processRecordsInput) {
		LOG.info("Processing " + processRecordsInput.getRecords().size() + " records from " + kinesisInitializationInput.getShardId());

        // Process records and perform all exception handling.
        processRecordsWithRetries(processRecordsInput.getRecords());

        // Checkpoint once every checkpoint interval.
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(processRecordsInput.getCheckpointer());
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
	}

モニタリング

CloudWatchによってKinesis(PutRecords/GetRecords等)の状態監視、およびKinesis Client Library(ワーカーの挙動など)の状態監視を行うことができる。

AWS Cognito(4)Cognitoストリーム

Cognitoストリームとは

Cognito Datasetに蓄積されたデータを取得するためには、Cognitoストリームを設定してKinesis経由でデータを出力する必要がある。Cognitoストリームを設定すると、Cognito Datasetに変更が生じる度にKinesisにそのデータを入力することができる。

Cognitoストリームの設定

  • Cognitoのダッシュボードから「Edit identity pool」をクリックし、Cognito Streamの項目を編集する

Cognitoストリームの設定

  • Kinesis stream名とシャード数を設定する
    — 「Create stream」をクリック
    — シャードとはKinesisの入出力処理のパイプの太さを表す

Kinesisストリームの作成

  • ロール設定を行う
    –「Create role」をクリック
    — ロール名が自動入力されるので、作成ウィザードで登録を行う

ロールの設定

  • 有効化する
    — 「StreamStatus」を「Enabled」に設定することで、Cognito Streamが有効化される
  • 設定変更を実行する
    — また、上記の設定を行った上で「Save Changes」をクリックしてもストリーム名、シャード、ロールなどがきちんと反映されていない場合があるので、反映されているか確認することが重要である

  • ユーザのロール設定と同様に、先ほど作成したロールのポリシーを確認する

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord"
      ],
      "Resource": [
        "arn:aws:kinesis:ap-northeast-1:172664222583:stream/STREAM_NAME"
      ]
    }
  ]
}

なお、Bulk publishをクリックすることでこれまで蓄積されたCognito DatasetをKinesisに一括で送り出すことが可能だが、24時間に1回しか実行できないことに注意が必要である。

ストリームレコード

CognitoからKinesisへは、以下のフォーマットでレコードが送信される。

{
  "identityPoolId" : "Pool Id"
  "identityId" : "Identity Id "
  "dataSetName" : "Dataset Name"
  "operation" : "(replace|remove)"
  "kinesisSyncRecords" : [
    { 
      "key" : "Key",
      "value" : "Value",
      "syncCount" : 1,
      "lastModifiedDate" : 1424801824343,
      "deviceLastModifiedDate" : 1424801824343,
      "op": "(replace|remove)"
    },
    ...
  ],
  "lastModifiedDate": 1424801824343,
  "kinesisSyncRecordsURL": "S3Url",
  "payloadType" : "(S3Url|Inline)",
  "syncCount" : 1
 }

AWS CLI経由でコマンドを入力することで、Cognitoから正常にレコード入力されているかの確認を行うことが可能である。