Watermill - Golang learning step 10-5

  • 公開日
  • カテゴリ:ToolForMicroservices
  • タグ:Golang,roadmap.sh,学習メモ
Watermill - Golang learning step 10-5

roadmap.sh > Go > Tool for Microservices > Watermill の学習を進めていきます。

※ 学習メモとしての記録ですが、後にこのセクションを学ぶ道しるべとなるよう、ですます調で記載しています。

contents

  1. 開発環境
  2. 参考 URL
  3. Watermill
  4. インストール
  5. Watermill を使った簡単な Pub/Sub の実装
  6. Watermill × Kafka を使った Go の Pub/Sub 実装
    1. なぜ Kafka を使うのか?
    2. 環境構築
    3. Kafka を使った Watermill の Pub/Sub 実装
    4. 動作確認
    5. Kafka の Pub/Sub を学習する際のイメージの持ち方

開発環境

  • チップ: Apple M2 Pro
  • OS: macOS Sonoma
  • go version: go1.23.2 darwin/arm64

参考 URL

Watermill

Watermill は、Go でイベント駆動型アプリケーションを構築するためのライブラリです。メッセージブローカー(Kafka、NATS、Google Pub/Sub など)を利用したパブリッシュ・サブスクライブ(Pub/Sub)モデルを簡単に実装できます。

  • 抽象化されたインターフェース: さまざまなメッセージキューに対応
  • 拡張性が高い: ミドルウェアやカスタムロジックを追加可能
  • シンプルな API: Go らしい直感的な設計

インストール

Watermill は go get でインストールできます。

go get github.com/ThreeDotsLabs/watermill

Watermill を使った簡単な Pub/Sub の実装

Watermill の基本的な使い方として、Go 内部のチャンネルを使ってメッセージをやり取りするプログラムを作成します。

  1. Publisher が Hello, Watermill! というメッセージを送信
  2. Subscriber がそのメッセージを受信して出力
  3. 内部の Go チャンネル (gochannel) を利用してメッセージをやり取り

以下のコードを main.go に保存し、実行します。

package main

import (
  "context"
  "fmt"
  "log"
  "time"

  "github.com/ThreeDotsLabs/watermill"
  "github.com/ThreeDotsLabs/watermill/message"
  "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
  logger := watermill.NewStdLogger(false, false)

  // Go のチャンネルを使う Pub/Sub
  pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

  // 購読開始
  topic := "example.topic"
  messages, err := pubSub.Subscribe(context.Background(), topic)
  if err != nil {
    log.Fatal(err)
  }

  go processMessages(messages)

  // メッセージを送信
  publishMessage(pubSub, topic)

  // 少し待機して終了(本来はアプリが動作し続ける)
  time.Sleep(2 * time.Second)
}

// メッセージの処理(Subscriber)
func processMessages(messages <-chan *message.Message) {
  for msg := range messages {
    fmt.Printf("Received message: %s\n", string(msg.Payload))
    msg.Ack() // メッセージを確認
  }
}

// メッセージの送信(Publisher)
func publishMessage(pubSub message.Publisher, topic string) {
  msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, Watermill!"))
  if err := pubSub.Publish(topic, msg); err != nil {
    log.Fatal(err)
  }
  fmt.Println("Message published!")
}

1. メッセージの購読(Subscriber)

messages, err := pubSub.Subscribe(context.Background(), topic)
go processMessages(messages)
  • pubSub.Subscribe() でメッセージを受信するチャンネルを作成
  • go processMessages(messages) で並行処理としてメッセージを処理

2. メッセージの送信(Publisher)

msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, Watermill!"))
pubSub.Publish(topic, msg)
  • message.NewMessage() でユニーク ID 付きのメッセージを作成
  • pubSub.Publish(topic, msg) で指定のトピックにメッセージを送信

3. メッセージの受信と処理

for msg := range messages {
    fmt.Printf("Received message: %s\n", string(msg.Payload))
    msg.Ack() // メッセージを確認
}
  • 受信したメッセージを出力
  • msg.Ack() で処理が完了したことを Watermill に通知

実行します

go run main.go 

実行結果

Message published!
Received message: Hello, Watermill!

Watermill × Kafka を使った Go の Pub/Sub 実装

Kafka などのメッセージブローカーと組み合わせることで、スケーラブルな非同期メッセージ処理 を簡単に実装できます。

なぜ Kafka を使うのか?

Kafka を使うと、複数のプロセス間でメッセージのやり取りが可能になります。

通信方式特徴
gochannel (Go のチャンネル)同じプロセス内でしかメッセージをやり取りできない
Kafka複数のプロセスやサーバー間でメッセージをやり取りできる

Watermill の gochannel は Go のメモリ上のチャンネルを使うだけなので、プロセスをまたぐ通信は不可でした。そのため、本格的な Pub/Sub を実装するには Kafka などのメッセージブローカーを利用する必要があります。

環境構築

Kafka を利用するのに必要なツールをセットアップ&インストールしていきます。

kafka コンテナ環境

Kafka をローカルで動かすために Docker を利用します。Kafka の開発元である Confluent 社が管理する confluentinc/cp-all-in-onedocker-compose.yml を使って、ローカルの開発用環境を構築します。

cp-all-in-one の docker-compose.yml を利用することで、Kafka を動作させるために必要な複数のコンポーネントが一括で起動でき、Kafka のフル機能をローカル環境で簡単にセットアップできます。

# docker-compose.yml ファイルダウンロード
curl -O https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.8.0-post/cp-all-in-one/docker-compose.yml

# コンテナ起動
docker compose up

これにより起動するコンテナは以下です。

docker compose ps

---
NAME                IMAGE                                                       COMMAND                  SERVICE             CREATED          STATUS         PORTS
broker              confluentinc/cp-server:7.8.0                                "/etc/confluent/dock…"   broker              15 minutes ago   Up 6 minutes   0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect             cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0             "/etc/confluent/dock…"   connect             15 minutes ago   Up 6 minutes   0.0.0.0:8083->8083/tcp, 9092/tcp
control-center      confluentinc/cp-enterprise-control-center:7.8.0             "/etc/confluent/dock…"   control-center      15 minutes ago   Up 6 minutes   0.0.0.0:9021->9021/tcp
flink-jobmanager    cnfldemos/flink-kafka:1.19.1-scala_2.12-java17              "/docker-entrypoint.…"   flink-jobmanager    15 minutes ago   Up 6 minutes   6123/tcp, 8081/tcp, 0.0.0.0:9081->9081/tcp
flink-sql-client    cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17   "/docker-entrypoint.…"   flink-sql-client    15 minutes ago   Up 6 minutes   6123/tcp, 8081/tcp
flink-taskmanager   cnfldemos/flink-kafka:1.19.1-scala_2.12-java17              "/docker-entrypoint.…"   flink-taskmanager   15 minutes ago   Up 6 minutes   6123/tcp, 8081/tcp
ksql-datagen        confluentinc/ksqldb-examples:7.8.0                          "bash -c 'echo Waiti…"   ksql-datagen        15 minutes ago   Up 6 minutes   
ksqldb-cli          confluentinc/cp-ksqldb-cli:7.8.0                            "/bin/sh"                ksqldb-cli          15 minutes ago   Up 6 minutes   
ksqldb-server       confluentinc/cp-ksqldb-server:7.8.0                         "/etc/confluent/dock…"   ksqldb-server       15 minutes ago   Up 6 minutes   0.0.0.0:8088->8088/tcp
rest-proxy          confluentinc/cp-kafka-rest:7.8.0                            "/etc/confluent/dock…"   rest-proxy          15 minutes ago   Up 6 minutes   0.0.0.0:8082->8082/tcp
schema-registry     confluentinc/cp-schema-registry:7.8.0                       "/etc/confluent/dock…"   schema-registry     15 minutes ago   Up 6 minutes   0.0.0.0:8081->8081/tcp
---
コンテナ名イメージ役割アクセス可能な URL
brokerconfluentinc/cp-server:7.8.0Kafka のブローカー。メッセージの送受信を担当。-
connectcnfldemos/cp-server-connect-datagen:0.6.4-7.6.0Kafka Connect。データベースやクラウドとの接続を行う。-
control-centerconfluentinc/cp-enterprise-control-center:7.8.0Kafka を監視・管理する Web UI。トピック管理やメトリクスの可視化が可能。http://localhost:9021(Web UI)
flink-jobmanagercnfldemos/flink-kafka:1.19.1-scala_2.12-java17Apache Flink のジョブ管理。ストリーム処理のジョブを管理。-
flink-sql-clientcnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17Flink の SQL クライアント。SQL を使ってストリーム処理を実行。-
flink-taskmanagercnfldemos/flink-kafka:1.19.1-scala_2.12-java17Flink のタスク処理。ジョブマネージャーから割り当てられた処理を実行。-
ksql-datagenconfluentinc/ksqldb-examples:7.8.0Kafka Streams のデータ生成。Kafka にテストデータを流す。-
ksqldb-cliconfluentinc/cp-ksqldb-cli:7.8.0ksqlDB のコマンドラインクライアント。Kafka Streams のクエリを実行。-
ksqldb-serverconfluentinc/cp-ksqldb-server:7.8.0Kafka Streams 向けの SQL 実行エンジン。Kafka のデータを SQL で処理。http://localhost:8088(API)
rest-proxyconfluentinc/cp-kafka-rest:7.8.0Kafka への HTTP API。Kafka に REST API でアクセス可能。http://localhost:8082(API)
schema-registryconfluentinc/cp-schema-registry:7.8.0メッセージのスキーマ管理。Avro などのデータスキーマを登録・検証。http://localhost:8081(API)

Kafka クライアント

Watermill の Kafka クライアントを Go にインストールします。

Kafka | Watermill | Event-Driven in Go

go get github.com/ThreeDotsLabs/watermill-kafka/v3

Kafka を使った Watermill の Pub/Sub 実装

Kafka を使った Publisher(メッセージ送信) と Subscriber(メッセージ受信) を 別プロセスで実行できる形 で実装します。

Publisher(メッセージを送信)

pub.go

package main

import (
  "fmt"
  "log"

  "github.com/ThreeDotsLabs/watermill"
  "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
  "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
  logger := watermill.NewStdLogger(false, false)

  // Kafka の Publisher を作成
  publisher, err := kafka.NewPublisher(kafka.PublisherConfig{
    Brokers:   []string{"localhost:9092"},
    Marshaler: kafka.DefaultMarshaler{},
  }, logger)
  if err != nil {
    log.Fatal(err)
  }
  defer publisher.Close()

  topic := "example.topic"

  // メッセージを 5 回送信
  for i := 1; i <= 5; i++ {
    msg := message.NewMessage(watermill.NewUUID(), []byte(fmt.Sprintf("Message %d", i)))
    if err := publisher.Publish(topic, msg); err != nil {
      log.Fatal(err)
    }
    fmt.Printf("Published: Message %d\n", i)
  }
}
  • Kafka の NewPublisher を使って、Kafka にメッセージを送信するパブリッシャーを作成
  • message.NewMessage() で ユニーク ID 付きのメッセージを作成
  • publisher.Publish(topic, msg) で Kafka にメッセージを送信

Subscriber(メッセージを受信)

sub.go

package main

import (
  "context"
  "fmt"
  "log"

  "github.com/ThreeDotsLabs/watermill"
  "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
  "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
  logger := watermill.NewStdLogger(false, false)

  // Kafka の Subscriber を作成
  subscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{
    Brokers:       []string{"localhost:9092"},
    Unmarshaler:   kafka.DefaultMarshaler{},
    ConsumerGroup: "example-group", // ConsumerGroup を指定することで、複数の Subscriber を並列に動作させ負荷分散することができる
  }, logger)
  if err != nil {
    log.Fatal(err)
  }
  defer subscriber.Close()

  topic := "example.topic"

  // Kafka からメッセージを購読
  messages, err := subscriber.Subscribe(context.Background(), topic)
  if err != nil {
    log.Fatal(err)
  }

  consumeMessages(messages)
}

// メッセージを受信して処理する
func consumeMessages(messages <-chan *message.Message) {
  for msg := range messages {
    fmt.Printf("Received: %s\n", string(msg.Payload))
    msg.Ack()
  }
}
  • Kafka の NewSubscriber を使って、Kafka からメッセージを受信するサブスクライバーを作成
  • subscriber.Subscribe(context.Background(), topic) で Kafka の指定したトピックからメッセージを購読
  • consumeMessages(messages) で 受信したメッセージを処理
  • msg.Ack() で メッセージの処理が完了したことを Kafka に通知

動作確認

Kafka を起動します。

docker-compose up

これで Kafka のブローカーが localhost:9092 で起動します。

# 初回のみ
cd <YOUR PROJECT ROOT>
go mod tidy

次に、Subscriber を起動します。

go run sub.go

これで Kafka のトピック example.topic にメッセージが届くのを待機します。

そして、Publisher を実行します。

go run pub.go

pub.go が Kafka にメッセージを送信、sub.go でメッセージを受信し、出力されます。

# Publisher (pub.go) の出力

Published: Message 1
Published: Message 2
Published: Message 3
Published: Message 4
Published: Message 5
# Subscriber (sub.go) の出力

Received: Message 1
Received: Message 2
Received: Message 3
Received: Message 4
Received: Message 5

Kafka を使うことで、別プロセス間のメッセージ送受信が可能になり、ローカルでも本格的なイベント駆動アーキテクチャを体験できました。

Kafka の Pub/Sub を学習する際のイメージの持ち方

実際の運用環境では、Publisher(メッセージの送信者) と Subscriber(メッセージの受信者)は異なるサーバーや異なるネットワーク環境に存在することが一般的です。しかし、今回の学習環境ではローカルの Docker コンテナ上で Kafka を起動し、Publisher と Subscriber を同じ環境で動かすことで、Watermill と共に Kafka の基本的な動作を理解することを目的としています。

実際の運用環境との違い

要素今回の学習環境実際の運用環境
Kafka のホストローカルの Docker 内クラウド(AWS MSK、Confluent Cloud など)またはオンプレミス
Publisher(送信者)ローカルで go run pub.go を実行別のアプリケーションサーバー
Subscriber(受信者)ローカルで go run sub.go を実行別のアプリケーションサーバー
通信方式localhost:9092 で接続Kafka のブローカーを外部アクセス可能にし、適切な認証方式を使用

本番環境では、メッセージの処理に失敗した際のリトライ戦略や、エラーが発生したメッセージを保存するデッドレターキュー(DLQ)の設定も重要になります。

今回の学習では Kafka のメッセージの流れにフォーカスしていますが、実際の運用を考えると、以下のようにイメージすると分かりやすくなります。

  1. Publisher は Web アプリやマイクロサービス
    • 例えば ECサイトでは、「商品が購入された」というイベントを Kafka に送信する注文管理システムが Publisher になる。
    • go run pub.go は、その一部をシンプルに再現したもの。
  2. Kafka はイベントを保持し、複数の Subscriber に配信
    • Kafka は中継役ではなく、メッセージを保持して必要なコンシューマーに配信するのが特徴。
    • 例えば、「購入イベント」を処理する支払いシステム、在庫管理システム、メール通知システムなど、複数の Subscriber が1つのイベントを並行処理できる。
    • これは go run sub.go を複数起動することでシミュレーション可能。
  3. Subscriber は異なるサービスが複数存在する
    • 実際の環境では、Kafka のメッセージを受信する Subscriber は異なるアプリケーションやマイクロサービスで動作する。
    • 例えば、決済システム(A)と、メール通知システム(B)は、それぞれ Kafka のトピックを購読し、処理内容も異なる。
    • go run sub.go を複数プロセスで実行し、それぞれ別の処理をさせることで、この動作を確認できる。

まとめ

  • Watermill は Go でイベント駆動型アプリケーションを構築するためのライブラリ
  • gochannel を使うとプロセス内でシンプルに Pub/Sub を実装可能
  • Kafka を利用すると複数プロセス間でスケーラブルなメッセージ配信が可能
  • Kafka のローカル環境構築には Docker を活用
  • publishersubscriber を分離することで、実際の運用に近い形で動作確認が可能
  • ConsumerGroup を設定することで、負荷分散やスケーラビリティを向上
  • 本番環境ではメッセージのリトライ戦略やデッドレターキューの考慮が必要


[Prev] Step 10-4: go-zero

Author

rito

  • Backend Engineer
  • Tokyo, Japan
  • PHP 5 技術者認定上級試験 認定者
  • 統計検定 3 級