ぽかぽかウンティの自由帳

ぽかぽか動物園から投稿します。

Redis Streamsって何なんです?

Redis5から使えるようになるらしいのですが Introduction to Redis Streams の英語が読めないのですよ...

Kafka的なのを再実装してみた的な? Messaging System ではなく Time Series Store として考えてみた的な? まずは Apache Kafkaに入門した から読んでみるか...メルカリっょぃ

ストリームにキーが付いていて、ストリームの中にエントリが追記型に書き込まれている感じか。

エントリの追加

  • XADD コマンドで追加するらしい。
  • キーの後のアスタはMySQLのAuto Increment的な指定。自分で指定しなければサーバーがエントリIDを自動生成して返してくれる。
    • フォーマット的には <millisecondsTime>-<sequenceNumber> らしい。エントリIDで範囲検索ができるように時間情報が含められているらしい。
    • もちろん自分で指定したければアスタを使わなければ良いが、自分でインクリメントしないといけない。
  • 第3引数以降はキーと値のペアが続く感じでLTSV的に扱えるデータ構造なのかな。
> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

> XADD somestream 0-1 field value
0-1

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

> XADD somestream 0-2 foo bar
0-2

エントリ件数の確認

XLEN コマンドで件数がわかるのか。

> XLEN mystream
(integer) 1

エントリの取得

範囲検索

  • XRANGE コマンドで first と last のエントリIDを指定して範囲検索ができるらしい。最初と最後は含みで。
  • また、特別な記号として - が最小 + が最大を表すので、それらを指定すれば全部取得できるのか。
  • エントリIDをハイフンで分割した際の左側の時間情報だけでも指定できるらしい。その場合はSequence Numberは最初と最後でよろしく引っかけてくれる。
  • COUNT オプションなんてのもあるらしい。take 2 的な。SQLのLIMIT句みたいなものか。返り値の最後のエントリIDをインクリメントして次に指定すればイテレートできるとのこと。
> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

もちろん逆転させたコマンドもある模様。最小最大の引数順序も逆になるので注意。

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

tail -f や Pub/Sub のSub的な読み込み

  • XREAD というコマンドを使うらしい。ブロッキングリスト?やPub/Subモデルとは似てるけど違うらしい。コンシューマーグループごとに一貫した操作ができるとのこと。
  • COUNT オプションは必須ではない。
  • STREAMS オプションは必須。
    • STREAMS オプションの引数はストリームキーおよび、読み出し済みの最後のエントリIDが続く。
    • つまり我々は読み出し済みの最後のエントリIDを保持してそれをこのコマンドで STREAMS オプションとして指定すれば、それより新しいエントリを読み出せる。
    • STREAMS オプションは最後に指定しないといけないらしい。
> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

> XREAD STREAMS mystream otherstream 0 0
(略)
  • 上記はノンブロッキングモードで、できることが範囲検索とあんまり変わらないが、 XREADブロッキングモードを指定できる。
  • BLOCK オプションで指定でき、その引数はタイムアウトのミリ秒。0は無制限を表す。
  • $ という特別な記号は最後のエントリIDを表すらしい。もちろん普通のエントリIDも指定可能。
  • 動作的にはこれが tail -f 的なイメージか。
> XREAD BLOCK 0 STREAMS mystream $
(略)

コンシューマーグループごとの読み込み

  • コンシューマーやコンシューマーグループなどの登場人物のネーミングに関してはKafka由来だが実装は違うとのこと。
  • コンシューマーやコンシューマーグループを駆使すれば並列に処理できてパフォーマンスが上がる感じか。
  • XGROUP でコンシューマーグループ名を指定して作成し XREADGROUP で読み込むらしい。
  • コンシューマー名やコンシューマーグループ名はこちらで指定するものでCase Sensitiveとのこと。
  • コンシューマー名はコンシューマーグループ内で一意でなければならない。
  • XGROUP コマンドは今は既存のストリームキーを指定しないといけないが近い将来 * 指定でストリームが自動生成できるようになるとのこと。
  • XREADGROUP コマンドには GROUP オプションにコンシューマーグループ名を、その後ろにコンシューマー名を指定する。 GROUP <group-name> <consumer-name>
  • XREADGROUP コマンドでは > が特別な意味を持つ記号で、他の?コンシューマーに届いてない新規メッセージだけを表すらしい。
  • もちろんエントリIDも指定できて、その場合は保留中の XACK されていないメッセージを取得できるらしい。
  • XREADGROUP コマンドは読み込み系コマンドであるが副作用があるのでレプリケーションしている場合はマスターノードにしか実行できないとか。
> XGROUP CREATE mystream mygroup $
OK

> XADD mystream * message apple
1526569495631-0

> XADD mystream * message orange
1526569498055-0

> XADD mystream * message strawberry
1526569506935-0

> XADD mystream * message apricot
1526569535168-0

> XADD mystream * message banana
1526569544280-0

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

> XACK mystream mygroup 1526569495631-0
(integer) 1

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

ここから先は Zzz...