AWS CloudWatch(2)システム運用における監視項目

AWSでシステム構築を行う場合、基本的にはこのCloudWawtchのみの監視で十分なことが多い。様々な監視項目が存在するが、システム運用を行う際に特に監視する必要のある項目は以下の通り。

DynamoDB

多くの項目がデフォルトで 1 分ごとにデータを送信するが、5 分間隔で送信されるデータも存在する。事前に設定した容量(スループット)が十分であるかを監視しておくと良い。

  • ProvisionedReadCapacityUnits
  • ConsumedReadCapacityUnits
  • ProvisionedWriteCapacityUnits
  • ConsumedWriteCapacityUnits

EC2

Amazon EC2 はデフォルトで 5 分ごとに CloudWatch にデータを送信する。インスタンスを作成する際に、「詳細モニタリングを有効」にチェックを入れることでこれを 1 分ごとに変更することができるCPU使用率を監視しておくと良い。メモリやディスクのメトリクスを取得するためには、CloudWatch Agentをインストールする必要がある

  • CPUUtilization

Kinesis

事前に設定した容量(シャード数)が十分であるかを監視しておくと良い。

  • PutRecords.Records
  • IncomingRecords
  • GetRecords.Bytes
  • IteratorAge

AWS Kinesis(5)Kinesis Client Libraryでマルチスレッド処理を行う

マルチスレッド処理

AWSが公開しているKinesis Client Library(KCL)のサンプルプログラムでは、取得した各レコードをシングルスレッドで順次処理している。しかしこれでは、前のRecord処理が終了しないと次のRecord処理が実行できないため、Kinesis Client LibraryのDEFAULT_MAX_RECORDの値を上げたとしても性能が十分出ない。

そこで、Record処理を単純にRunnaleなどでマルチスレッド化してしまうとスレッド数が制御できなくなり、例えばKinesis Recordから取得したデータをDynamoDBに順次書き込むという制御を記述していた場合は、

com.amazonaws.http.AmazonHttpClient executeHelper
INFO: Unable to execute HTTP request: Timeout waiting for connection from pool
org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool

など、DynamoDBにアクセスするためのHTTPのリソースが枯渇してエラーが発生し、Record処理に漏れが生じてしまう。そこで、ExecutorServiceを用いてスレッド数を制御しながらマルチスレッド化する。

    /**
     * Process records performing retries as needed. Skip "poison pill" records.
     *
     * @param records Data records to be processed.
     */
    private void processRecordsWithRetries(List<Record> records) {
        ExecutorService exec = Executors.newFixedThreadPool(NUM);
        try {	        
            for (Record record : records) {
                boolean processedSuccessfully = false;
                for (int i = 0; i < NUM_RETRIES; i++) {
                    try {
                        // スレッドタスクを実行
                        exec.submit(new My_Method(this, record));
                        // 略...
                    }
                }
            }
        } finally {
            // スレッドタスクを終了
            exec.shutdown();
            if(!exec.awaitTermination(60, TimeUnit.SECONDS)){
                exec.shutdownNow();
            }
        }
    }

ExecutorServiceは、スレッドプールを用いて、マルチスレッドタスクを管理しながら実行できる仕組みで、submit()によりタスクが生成されて、ブロッキングキューに挿入され、shutdown()もしくはshutdownNow()メソッドにより処理を終了させることができる。ExecutorServiceを用いると生成するスレッド数を指定できることから、無尽蔵にスレッドが生成される心配がない。

ExecutorServiceは、終了処理を必ず明示的に実装しておく必要があるサービスである。生成済みのタスクは、shutdown()メソッド実行後も処理が継続されるが、shutdownNow()メソッドの場合は、強制的に処理がキャンセルされる。すなわち、shutdown()メソッド実行を実行したあとも、処理が継続したままである場合があることに注意が必要である。したがって、上記のように、shutdown()メソッドを実行した後に、awaitTermination()メソッドによりタイムアウトの時間を設定しておき、この時間を超えても処理が継続している場合には、shutdownNow()メソッドで強制的に処理を終了させるという実装にすることが望ましい。

ExecutorServiceで設定したスレッド数(NUM)が多すぎると以下のエラーが発生するため、スレッド数の上限を設定する際は注意が必要である。

java.lang.OutOfMemoryError: unable to create new native thread 

プログラムがどれほどスレッドを消費しているかは、以下のコマンドで確認が可能である。

ls -l /proc/[プログラムのプロセスNo.]/task | wc -l

また、Linuxの最大スレッド数は、

cat /proc/sys/kernel/threads-max

ユーザ1人あたりの制限は、

ulimit -a

で確認することが可能である。

AWS Lambda(2)LambdaでKinesis Recordを取得する

Kinesis Process Records

Kinesis Event 発生毎にLambdaを実行し、Kinesis Recordを取得することが可能である。AWS マネジメントコンソールから関数を作成する場合は、kinesis-process-recordを選択。

Kinesis関数の設定

Stream名など必要事項を記入しプログラムをアップロードすることでLambdaを実行することが可能となる。

lambda_kinesis_function_2

Eclipseで開発する場合は、EclipseのAWSプラグインから直接アップロードや設定を行うことも可能である。Eclipseプロジェクトを右クリックして、「Amazon Web Services」から「Upload function to AWS Lambda…」を選択、「アップロードするリージョン」と「関数名」を指定し、

Kinesis関数の設定

「Description」や「Role」、プログラム一式を格納する「S3バケット」を指定して「Finish」を押すとEclipseで作成したLambda関数がAWS上にアップロードされる。

Eclipse設定

JavaによるLambda関数の実装

RequestHandler<KinesisEvent, Object>を実装することで、Kinesisからデータを取得し処理を行うことが可能となる。1回の処理で複数のKinesis Recordを取得するのでfor文で廻してそれぞれのRecordの値を取得することができる。Kinesis Recordには、ApproximateArrivalTimestampというRecord入力時に自動付与されるタイムスタンプが存在し、この値を取得するためのgetApproximateArrivalTimestamp()というメソッドも用意されているが、現時点では値の取得に対応していおらず、このメソッドを実行してもnullしか返えらないので注意が必要である。

public class KinesisEventHandler implements RequestHandler<KinesisEvent, Object> {

    @Override
    public Object handleRequest(KinesisEvent input, Context context) {

        context.getLogger().log("Input: " + input);

        for(KinesisEventRecord rec : input.getRecords()) {
        	// Recordを取得
        	Record record = rec.getKinesis();
        	// JSONを取得
        	byte[] byteArray = new byte[record.getData().remaining()];
        	record.getData().get(byteArray);
        	String json = new String(byteArray);
        	// タイムスタンプが取得できない
        	String timestamp = rec.getKinesis().getApproximateArrivalTimestamp();
        }
    }
}

Lambdaのスロットリング

Lambdaは、1つの関数につき1秒あたり同時処理数100でスロットリングされている。また、上限値の引き上げを申請しても1秒あたり同時接続数1000程度が限界のようである。したがって、Kinesisに大量のRecordが入力された場合はLambdaの上限を超えてしまいRecordを取りこぼしてしまう可能性がある。Kinesis Recordを確実に取得し処理する必要がある場合は、EC2上でKinesis Client Libraryを用いてKinesis Appを実装する必要がある。

AWS Kinesis(4)Kinesis Producer Library

Kinesis Producer Libraryとは

Kinesis Client Libraryに付随しているKinesis Producer Library(KPL)を用いることで、Kinesisに容易にデータを投入することが可能となる。KPLを用いることで自動的に複数シャードにデータを投入したり、ユーザレコードを集約してスループットを改善する等が可能となる。複数のレコードを単一のKinesis Recordに集約することで、API呼び出し時に取得できるデータ量が増大させることが可能になるため、より少ないシャード数の用意でデータ量を処理することが可能となる。

またKPLは独立したプロセスで動作しているため、KPLがクラッシュした場合でも他の機能の動作を継続させることが可能である。

Kinesis Producer Libraryを用いたデータ取得処理

パーティションキー

Kinesisへデータを投入する際には、ストリーム名の指定パーティションキーの指定が必要である。パーティションキーは、複数のシャードの中からストリームを追加するシャードを選択するために使用するキーで、均等にシャードを選択するためにシャード数より十分大きい数を用意する必要がある。AWS SDKに付属しているサンプルプログラムでは、現在時刻(Current Millis)をパーティションキーとして使用している。

long createTime = System.currentTimeMillis();
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", createTime));

リージョンの設定

KPL は Kinesis Client Library と同様に、リージョン指定が無いとデフォルトのバージニア州(us-east-1)が指定されてしまう。サンプルアプリケーションでは以下の設定を変更する必要がある。

kinesis = new AmazonKinesisClient(credentials);
kinesis.withRegion(Region.getRegion(Regions.fromName(REGION_NAME)));

なお、KinesisProducerConfigurationはProperyファイルからの設定の読み込みにも対応しているため、Propetyファイルからリージョンを指定することも可能である。

Kinesisへの書き込み

Kinesisへのレコードの書き込みは、Kinesis Producer Libraryを用いる方法の他に以下のような手段もある。

  • AWS SDK (Kinesis Stream API)の使用
  • Amazon Kinesis エージェントの使用
  • fluent-plugin-kinesisの使用

AWS Kinesis(3)リシャーディング

リシャーディングとは

Kinesisでは、シャードと呼ばれるストリーム単位で処理される。事前に設定するシャードの数が並列に処理を行う土管の数となる。Kinesisを設定する際にこのシャードの数を指定するが、実際の負荷に合わせてシャード数を変化させることができる。これがリシャーディングである。

Amazon Kinesis の主要なコンセプト - Amazon Kinesis

リシャーディングは単純に数の増減をするだけではなく、「どのシャードを分割するのか」や「どのシャードとどのシャードを分割するのか」を指定する必要がある。通常はどのシャードに負荷が掛かっているかをCloudWatchなどを用いて計測してリシャーディングを決定する。またシャードの結合は、隣接したシャード同士で無いと実現できない。

リシャーディング処理をコマンドライン上で実現するツールがGitHub上で公開されている。以下のようにパラメータを記述することで、リシャーディングを実行できる。

cd amazon-kinesis-scaling-utils
cd dist
java -cp KinesisScalingUtils.jar-complete.jar -Dstream-name=MyStream -Dscaling-action=scaleUp -Dcount=10 -Dregion=ap-northeast-1 ScalingClient

実行する際のオプションパラメータは以下の通り。

オプションパラメータ 内容
stream-name ストリーム名
scaling-action スケールアクション “scaleUp”, “scaleDown” or “resize”
count シャード数
pct 既存シャードに対する増加減数(%)
region リージョン
shard-id ターゲットとするシャードID

AWS Kinesis(2)Kinesis Client Library

Kinesis Client Libraryとは

Kinesis Client Libraryを用いて、Kinesis Applicationと呼ばれるプログラムを作成することができる。Kinesis Applicationは、Kinesisからデータを取得し、DynamoDBやRedshift、S3などにKinesisストリームを転送することが可能である。Kinesis Client Libraryが、複数のインスタンス間での負荷分散インスタンスの障害に対する応答処理済みのレコードのチェックポイント作成リシャーディングへの対応などを行うので、ユーザはデータ処理部分のみに注力することができる。

アプリケーションの状態の追跡

Kinesis Client Libraryは、アプリケーション毎にDynamoDBに特別なテーブルを作成して各アプリケーションの状態を追跡する。テーブルには、どのデータまでが読み込み済みであるかを示すチェックポイントの値などのアプリケーションの状態を示す情報が、シャード毎に記録される。テーブル名は、プログラム上で指定したアプリケーション名と同一である。1 秒あたりの読み込み 10 回、書き込み 10 回のスループットを持つテーブルが生成されるが、シャード数が多い場合などはスペックが足りなくなる場合があるので注意が必要である。

Kinesisステータステーブル

並列処理

Kinesis Client Libraryは、1つのWorkerで複数のシャード処理を実行することが可能である。1つのシャードを複数のWorkerで処理することはできない。シャード数よりも多い数のWorkerを立ち上げても処理は実行されないので注意が必要である。

設定値

Kinesisの制限事項

Kinesis Client Libraryのデフォルト値

  • getRecordsメソッドで取得するレコードは最大10000件
       /**
         * Max records to fetch from Kinesis in a single GetRecords call.
         */
        public static final int DEFAULT_MAX_RECORDS = 10000;
  • データ取得間隔は1秒
       /**
         * Idle time between record reads in milliseconds.
         */
        public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;

認証方法の設定

サンプルアプリケーションでは、認証情報をCredentialsから取得する実装となっているが、Kinesis Client LibraryをEC2インスタンス上で動作させる場合は、IAM Role(InstanceProfileCredentialsProvider)から取得する方法に変更することも可能である。

Kinesis Client Libraryを用いたデータ取得処理

Kinesis Client Libraryは複数の言語で提供されているが実体はJavaであり、MultiLangDaemon という多言語インターフェイスを通して他の言語でも機能が提供されている。今回は、AWS Toolkit for Eclipseに付属するサンプルアプリケーションとJavaライブラリを使用してKinesisからデータを取得する。

環境の構築

Kinesis Client LibraryをEC2で実行する場合は、EC2上にJavaの実行環境を用意する必要がある。

sudo yum -y install java-1.8.0-openjdk-devel
sudo alternatives --config java

また、Kinesis Client Libraryを実行するのに必要なパーミッションは以下の通り。

サービス 操作
kinesis DescribeStream, GetShardIterator, GetRecords
dynamodb CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem
cloudwatch PutMetricData

リージョンの設定

AWS SDK for Java および Kinesis Client Library は、リージョン指定が無いとデフォルトのバージニア州(us-east-1)が指定されてしまう。サンプルアプリケーションでは以下の設定を変更する必要がある。

kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
kinesisClientLibConfiguration.withRegionName("ap-northeast-1");
// Delete the stream
AmazonKinesis kinesis = new AmazonKinesisClient(credentials);
kinesis.setEndpoint("kinesis.ap-northeast-1.amazonaws.com");
// Delete the table
AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(credentialsProvider.getCredentials());
dynamoDB.setRegion(Region.getRegion(Regions.fromName("ap-northeast-1")));

最大取得数の設定

上述の通りデフォルトの最大取得数は10000である。これを変更するには以下を追記する。

kinesisClientLibConfiguration.withMaxRecords(500);

独自処理の追加

取得したデータをもとに独自処理を追加するためには、以下の箇所にデータを追記する。

    private void processSingleRecord(Record record) {
        // TODO Add your own record processing logic here

    	// レコードを取得
    	byte[] byteArray = new byte[record.getData().remaining()];
        record.getData().get(byteArray);
        String json = new String(byteArray);

    	// 以下に、JSONデータをパースするなどの処理を追記
    	//
    	//

JSONのパースには、Jacksonというライブラリが便利である。

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

JSON上のKeyを指定し、変数として抽出/処理することも可能である。

 ObjectMapper mapper = new ObjectMapper();
 	try {
		JsonNode node = mapper.readTree(json);

		// kinesisSyncRecordsを取得
		JsonNode kinesisSyncRecordsNode = node.get("kinesisSyncRecords");
		if(kinesisSyncRecordsNode.isArray()){
			// identityId値を取得
			String id = node.get("identityId").asText();
			// 配列をforループ
			for(final JsonNode objNode : kinesisSyncRecordsNode){
				// 処理
			}
		}
	} catch {
		// エラー処理
	}

Kinesis Client Library v2へ移行する

Kinesis Client Libraryはバージョン1.5.0以降、IRecordProcessor インターフェイスのバージョン2を使用することが可能である。KCL for Java sample projectでは、現在もv1対応のサンプルプログラムのみ公開されており、v2対応のものは存在しない。

v1とv2では、以下のようにいくつかメソッドのパラメータに変更が生じている。

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

Worker worker = new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(kinesisClientLibConfiguration).build();
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

private InitializationInput kinesisInitializationInput;

	@Override
	public void initialize(InitializationInput initializationInput) {
		LOG.info("Initializing record processor for shard: " + initializationInput.getShardId());
        this.kinesisInitializationInput = initializationInput;
	}

	@Override
	public void shutdown(ShutdownInput shutdownInput) {
        LOG.info("Shutting down record processor for shard: " + kinesisInitializationInput.getShardId());
        // Important to checkpoint after reaching end of shard, so we can start processing data from child shards.
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            checkpoint(shutdownInput.getCheckpointer());
        }
	}

	@Override
	public void processRecords(ProcessRecordsInput processRecordsInput) {
		LOG.info("Processing " + processRecordsInput.getRecords().size() + " records from " + kinesisInitializationInput.getShardId());

        // Process records and perform all exception handling.
        processRecordsWithRetries(processRecordsInput.getRecords());

        // Checkpoint once every checkpoint interval.
        if (System.currentTimeMillis() &amp;gt; nextCheckpointTimeInMillis) {
            checkpoint(processRecordsInput.getCheckpointer());
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
	}

モニタリング

CloudWatchによってKinesis(PutRecords/GetRecords等)の状態監視、およびKinesis Client Library(ワーカーの挙動など)の状態監視を行うことができる。

コネクタライブラリ

コネクタライブラリと併用することで、Amazon DynamoDB、Amazon Redshift、Amazon S3、Amazon Elasticsearch Service などのサービスと連携してデータを受け渡すことができる。

AWS Kinesis(1)Kinesisの概要

Kinesisとは

Kinesisは、大量のデータを受け付けて配信先に順序通りに配信するバッファ機構である。Kinesisを利用することで高速かつ継続的にデータの取り込みと集約を行うことが可能であり、ログデータやセンサーデータなどの継続的に入力されるストリームの処理に適している。Kinesisにデータが入力されて取得できるようになるまでは1秒未満である。瞬時に処理を実行することが可能である。各クライアントから直接S3やDynamoDB, RDS等にデータ入力するのと比べ、より信頼性が高く安価なシステム構築が可能である。なお、データレコードは通常はストリームに追加されてから24 時間のみアクセス可能(オプションを使用すれば、最大168時間までアクセス可能)である。

Amazon Kinesis の主要なコンセプト - Amazon Kinesis

Kinesisに入力されたデータは、 Amazon Kinesis Client Library を用いたプログラム( Amazon Kinesis Application)をEC2上で動作させることで、加工や抽出・他のデータベースへの転送を行うことができる。Amazon Kinesis Applicationは、 DynamoDB上に制御テーブルを用意して、現在のチェックポイントを記録する。このため複数のプログラムを同時に実行させて、重複することなく並列でデータ処理を行うことが可能である。Application名は一意である必要があり、DynamoDBの制御名にも利用される。なお、「シャード」は、Kinesis上のデータ処理経路を、「プロデューサ」はKinesisへのデータの入力部分を、「コンシューマ」はKinesisからのデータ取得部分(役割)を指す。

レコードの順序

シャードイテレータは、シャード単位でデータを取得する。データの取得方法に関しては以下の4つが規定されている。

ShardIteratorType 取得方法
AT_SEQUENCE_NUMBER 特定のシーケンス番号からデータを取得
AFTER_SEQUENCE_NUMBER 特定のシーケンス番号の次のデータを取得
TRIM_HORIZON シャードの中でトリムされていない一番古いデータから取得
LATEST シャードの一番新しいデータから取得

レコードを取得(GetRecords)する際には、レコード毎に付与されたシーケンス番号がキーとなる。シーケンス番号は、シャード単位で管理されており、レコードを入力(PutRecords)した際にシャードごとにインクリメントされる。したがって、シーケンス番号順に取得するということは、すなわち各シャードに入力されたレコードを時系列順に取得するということと等しい。

ただし、シーケンス番号はシャード単位で管理されているため、1つのKinesis Client Libraryが、複数のシャードから同時にレコードを取得した場合などは、複数シャードのレコードが混在して取得されることから、取得データが時系列順に並ぶとは限らない。また、シャードへのデータ入力処理は並列で実施されるために、シャードへレコードが同時に入力された場合には、シーケンス番号が意図した通りにインクリメントされるとは限らない。レコード入力の際に、シャード内で厳密にシーケンス番号をインクリメントさせる場合には、SequenceNumberForOrderingパラメータを付与して、レコードの入力を行う。

時系列順に厳密にレコードを取得する必要がある場合には、プロデューサごとに毎回同じパーティションキーを使用し、かつSequenceNumberForOrderingパラメータを付与した上でレコードの入力を行う。これによって同一のプロデューサからのレコードは、同一のシャードに入力される。また、レコードを取得する際も、1シャードにつき1つのKinesis Client Libraryを用意することで、複数シャードのレコードを混在させることなく、レコードを取得・処理することが可能となる。ただし、各プロデューサごとに同じパーティションキーを使用する手法は、シャードごとにレコード量の偏りを生じる可能性もあるので注意が必要である。

課金

  • 無料枠は存在しない
  • シャード時間および PUT ペイロードユニットによる従量課金制

AWS CLIを用いたデータの取得

Kinesisの雰囲気をつかむためにawscliで操作する – Qiita

  • シャードイテレータの取得
    aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name STREAM_NAME
    {
        "ShardIterator": "AAAAAAAAAAEJa+Y5A5ZF3pdoe9Yfwefjiwfweifw9eOy5kguQL7aglWO4VI+Fcb/A9bzR/tKQBW8Yxco9RyOlRfs0q8RgFC0g6wHCnznhzDjpP9Xpfg6vuY/EPPHhYyxDdSKwePQjojgmTTqQlZzbkRHSEo/qSB+Nuqbg4asIsKiYwv96vvJoqxGkQi6RTN3DVf83Vf4nirQ0Sa4tg2A1sAyPfvr/r4etOX"
    }
  • シャードイテレータを使ったレコードの取得
    aws kinesis get-records --shard-iterator AAAAAAAAAAEJa+Y5A5ZF3pdoe9Yfwefjiwfweifw9eOy5kguQL7aglWO4VI+Fcb/A9bzR/tKQBW8Yxco9RyOlRfs0q8RgFC0g6wHCnznhzDjpP9Xpfg6vuY/EPPHhYyxDdSKwePQjojgmTTqQlZzbkRHSEo/qSB+Nuqbg4asIsKiYwv96vvJoqxGkQi6RTN3DVf83Vf4nirQ0Sa4tg2A1sAyPfvr/r4etOX
  • レコードの例
  • Data部分はBase64エンコードされる
  • レコードが存在しない場合は、Recordsが空の状態でレコードが返る
    {
      "Records":[ {
        "Data":"dGVzdGRhdGE=",
        "PartitionKey":"Batch-SagbXhGLwl”,
        "SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
      } ],
      "MillisBehindLatest":24000,
      "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
    }

文献