Amazon Kinesis Streamsを使ってみた

11日目、というより11記事目といったほうがいいですかね。ネタを思いつかないので、基本に立ち返って「使ってみた」系にしてみました。

そこそこの期間AWSを利用しているのですが、業務に関係無かったりするとなかなか触れないサービスもあったりします。そういうものを触っていこうというのも今回のAdventCalendar的更新のモチベーションの1つだったはずなので、わざわざ簡単なシステムに落とさなくてもいいのではと思った次第です。

というわけで、Kinesisを使ったことがなかったのでとりあえずはじめの一歩的な内容をば。(間違っていることもあるかも)

Kinesis Streamの作成

今回扱うのはKinesis Streamの方です。今回はFirehose扱いませんが、バージニアオレゴンアイルランドの3リージョンでのみ利用可能となっています。

f:id:matetsu:20151213195311p:plain

それでは始めましょう。Let's 「Create Stream」!

f:id:matetsu:20151213213618p:plain

名前はとりあえず「stream-test」とでもしておきます。

はじめてなので、「Help me decide how many shards I need」にチェックを入れて助けを請いましょう。とりあえずは平均レコードサイズは1KBとして最大書き込み/秒は適当に100としておきましょうか。Consumerアプリケーション数は1とします。

すると、Shard数は「1」で良いでしょうと言った感じで出てきます。

f:id:matetsu:20151213211401p:plain

では、お言葉に甘えて「1」を利用させていただきましょう。(ドキュメントのシャードの項を読めばたいんいつシャードあたりの性能はわかりますし、Shard数を入れると下にキャパシティなどが表示されるのでそれを見ながら調整してもいいんでしょうねw)

f:id:matetsu:20151213213211p:plain

「Create」ボタンを押したら、Statusが「Active」になるまで待ちましょう。

f:id:matetsu:20151213213358p:plain

ACTIVEになるとこんな感じ。

f:id:matetsu:20151213213420p:plain

streamができあがりましたので、データ(レコード)を入れてみましょう。

Streamにレコードを送信する

KinesisではStreamにデータを送信する役割をもったノードやアプリケーションをProducerと呼びます。

まずは簡易的にレコードを送信してみたいので、kinesis-catを利用してみます。

$ gem install kinesis_cat
$ echo '{"key1": "val1", "key2": "val2"}' | kinesis-cat --stream-name stream-test

では送信したレコードを確認してみましょう。確認するためには、CLIを使います。

まずshardの情報を取得して、シャードの位置を確認する必要があります。kinesis get-shard-iteratorコマンドで取得したShardIteratorは300000msec以内に利用しないとExpireされますので、初めてでもたついていると苦労します。

$ aws kinesis describe-stream --stream-name stream-test
{
    "StreamDescription": {
        "StreamStatus": "ACTIVE",
        "StreamName": "stream-test",
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:ACCOUNT_NO:stream/stream-test",
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455",
                    "StartingHashKey": "0"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49557238107343677981012040936118820808140350529483046914"
                }
            }
        ]
    }
}
$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name stream-test
{
    "ShardIterator": "AAAAAAAAAAFPuPSOFFd7SOl/TA2jlaJQHCJjOdBOtaqG/R/vgaY9LpL+u7QofNeFxXHlR54VPmcrEcTOGQocib2Gcd6xGwXpknZSPEVHw0mcDUPnFYaEuf1O61Yacn4bPJNj86SWT033nlXkycojKM0+0td08FCfQ/Behdp2SloPr7hIehAUMZeYh3yU8t0n2NvIw+tbvM4WT32X2fn7/kya87tVhLLM"
}
$ aws kinesis get-records --shard-iterator AAAAAAAAAAHTBivnV5Rk0dGwvKd0PnA2ntjdqYX6xsO3wtkTP/37T63C5AdZTbwfONrXge1PT43OKkkrbx1Cga6lbE+VMNzBwHoxJW9XZ35vUpkhkaTq1c62HSALUp3B+AM8cJaRtu3gpNWi2emSta+lGxLeO2CgT8fdq9yy5uZFAlkHYgilgDAbxnHl6Ps3+6g38jdXP4IsXjG4HbrYHf6P/bSUeSru
{
    "Records": [
        {
            "PartitionKey": "1450012611",
            "Data": "eyJrZXkxIjoidmFsMSIsImtleTIiOiJ2YWwyIn0=",
            "SequenceNumber": "49557238107343677981012040936505677070417222287059058690"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAGlPk98twtF2KpCnwlOadDjIgSDr60meZq9qjV88WLzDWPpuplichv5qALBFXZ2igTDMxRBy15Tchl6mlFuNGohx6Nu0MAQ07BB4Mv+NWmmLsxXHeGFd19tFrcABk7k+nKEVnTZsV1qDiQf+5Y32bCi4Wm5xvsGXPie9QUBfi6zrLwViHNVM0FNfNWGdOkliOxXxALtl731nVv6neWhVvJg",
    "MillisBehindLatest": 685000
}

ShardIteratorの取得とRecordsの取得は、合わせてこんな感じにしたほうがいいですかね。(ドキュメントにも近いことが書いてある)

$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name stream-test | jq -r .ShardIterator)

また、表示されたDataはBase64エンコードされているので、

$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name stream-test | jq -r .ShardIterator) | jq -r ".Records[0].Data" | base64 -D
{"key1":"val1","key2":"val2"}

とするとデコードされた状態で確認することができます。

再度レコードを登録して、重複なく取得するためには、NextShardIteratorを利用します。

$ aws kinesis get-records --shard-iterator AAAAAAAAAAGlPk98twtF2KpCnwlOadDjIgSDr60meZq9qjV88WLzDWPpuplichv5qALBFXZ2igTDMxRBy15Tchl6mlFuNGohx6Nu0MAQ07BB4Mv+NWmmLsxXHeGFd19tFrcABk7k+nKEVnTZsV1qDiQf+5Y32bCi4Wm5xvsGXPie9QUBfi6zrLwViHNVM0FNfNWGdOkliOxXxALtl731nVv6neWhVvJg --stream-name stream-test
{
    "Records": [],
    "NextShardIterator": "AAAAAAAAAAGeum3ukha5gAZXqSkErxc+CUW0LhCIWv1iJ65F6vEvKyIC//McDjma4EKRfGNZqOFOhGCn4EHMTFyjwmbxzl35Pv6vY6a43OBjaQlZ7/zUfk435y8ULCtc0M6w4J9U9s5ttYdI9DIOZ+JlKL0KKgaze13nLmo2PpmCtKglyQ4PB4nE761Etyv3j8ZgzvorRfIW9/pmBPkn6/Z7E5j5NdSl",
    "MillisBehindLatest": 0
}

と言った感じで、今は何も次に登録していないので、空のRecordsが返ってきます。

急いで新たなレコードを登録して、先ほど表示されたNextShardIteratorを指定してみましょう。

$ echo '{"key1": "val1", "key2": "val2"}' | kinesis-cat --stream-name stream-test
$ aws kinesis get-records --shard-iterator AAAAAAAAAAGeum3ukha5gAZXqSkErxc+CUW0LhCIWv1iJ65F6vEvKyIC//McDjma4EKRfGNZqOFOhGCn4EHMTFyjwmbxzl35Pv6vY6a43OBjaQlZ7/zUfk435y8ULCtc0M6w4J9U9s5ttYdI9DIOZ+JlKL0KKgaze13nLmo2PpmCtKglyQ4PB4nE761Etyv3j8ZgzvorRfIW9/pmBPkn6/Z7E5j5NdSl
{
    "Records": [
        {
            "PartitionKey": "1450014952",
            "Data": "eyJrZXkxIjoidmFsMSIsImtleTIiOiJ2YWwyIn0=",
            "SequenceNumber": "49557238107343677981012040936509303847876227046878216194"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAF+SbH2cLWMq0ytQLuaMc3ZSH7pNMBCU2hGycDaMlZZBIh5E8p9fVHeOsBidhoYPJ1DzoFUdDFCbLG+H2WHcqD9tcGIQN1+TR2cVrnTOZwijnlccd1XoVQDrDcQ5dIMwZIHMQggvG5i+eEVobCpmqWbicoE0H1XcrMfbSJ1KTFAsYVLJ3VBpsdC0UR9ofRudf2ZF/lk2G2W6BotkjFrvLEn",
    "MillisBehindLatest": 37000
}

はい、うまくいきました。とりあえず、KinesisのStreamにレコードを登録して、取得するところを見てみることができました。

おわりに

とりあえず動かしてみたレベルでしか無いので、次回はより実践的な内容で使ってみたいと思います。実際の業務やサービスで使えるレベルにできるようになるよう頑張りたいと思います。

ふぅ

はい、11件目の更新でした。まだまだ追いつくことができていませんが、なんとかしていきたいと思います。ネタ的には失速感が半端ないですが、続けることに意味があると自分に言い聞かせております。。(2日途切れさせたのに)

なんだか書き方的に某書籍を書いている時のような感じがしましたw