Watermill - Golang learning step 10-5
- 公開日
- カテゴリ:ToolForMicroservices
- タグ:Golang,roadmap.sh,学習メモ

roadmap.sh > Go > Tool for Microservices > Watermill の学習を進めていきます。
※ 学習メモとしての記録ですが、後にこのセクションを学ぶ道しるべとなるよう、ですます調で記載しています。
contents
開発環境
- チップ: Apple M2 Pro
- OS: macOS Sonoma
- go version: go1.23.2 darwin/arm64
参考 URL
Watermill
Watermill は、Go でイベント駆動型アプリケーションを構築するためのライブラリです。メッセージブローカー(Kafka、NATS、Google Pub/Sub など)を利用したパブリッシュ・サブスクライブ(Pub/Sub)モデルを簡単に実装できます。
- 抽象化されたインターフェース: さまざまなメッセージキューに対応
- 拡張性が高い: ミドルウェアやカスタムロジックを追加可能
- シンプルな API: Go らしい直感的な設計
インストール
Watermill は go get でインストールできます。
go get github.com/ThreeDotsLabs/watermill
Watermill を使った簡単な Pub/Sub の実装
Watermill の基本的な使い方として、Go 内部のチャンネルを使ってメッセージをやり取りするプログラムを作成します。
- Publisher が Hello, Watermill! というメッセージを送信
- Subscriber がそのメッセージを受信して出力
- 内部の Go チャンネル (gochannel) を利用してメッセージをやり取り
以下のコードを main.go
に保存し、実行します。
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
logger := watermill.NewStdLogger(false, false)
// Go のチャンネルを使う Pub/Sub
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// 購読開始
topic := "example.topic"
messages, err := pubSub.Subscribe(context.Background(), topic)
if err != nil {
log.Fatal(err)
}
go processMessages(messages)
// メッセージを送信
publishMessage(pubSub, topic)
// 少し待機して終了(本来はアプリが動作し続ける)
time.Sleep(2 * time.Second)
}
// メッセージの処理(Subscriber)
func processMessages(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("Received message: %s\n", string(msg.Payload))
msg.Ack() // メッセージを確認
}
}
// メッセージの送信(Publisher)
func publishMessage(pubSub message.Publisher, topic string) {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, Watermill!"))
if err := pubSub.Publish(topic, msg); err != nil {
log.Fatal(err)
}
fmt.Println("Message published!")
}
1. メッセージの購読(Subscriber)
messages, err := pubSub.Subscribe(context.Background(), topic)
go processMessages(messages)
pubSub.Subscribe()
でメッセージを受信するチャンネルを作成go processMessages(messages)
で並行処理としてメッセージを処理
2. メッセージの送信(Publisher)
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, Watermill!"))
pubSub.Publish(topic, msg)
message.NewMessage()
でユニーク ID 付きのメッセージを作成pubSub.Publish(topic, msg)
で指定のトピックにメッセージを送信
3. メッセージの受信と処理
for msg := range messages {
fmt.Printf("Received message: %s\n", string(msg.Payload))
msg.Ack() // メッセージを確認
}
- 受信したメッセージを出力
msg.Ack()
で処理が完了したことを Watermill に通知
実行します
go run main.go
実行結果
Message published!
Received message: Hello, Watermill!
Watermill × Kafka を使った Go の Pub/Sub 実装
Kafka などのメッセージブローカーと組み合わせることで、スケーラブルな非同期メッセージ処理 を簡単に実装できます。
なぜ Kafka を使うのか?
Kafka を使うと、複数のプロセス間でメッセージのやり取りが可能になります。
通信方式 | 特徴 |
---|---|
gochannel (Go のチャンネル) | 同じプロセス内でしかメッセージをやり取りできない |
Kafka | 複数のプロセスやサーバー間でメッセージをやり取りできる |
Watermill の gochannel
は Go のメモリ上のチャンネルを使うだけなので、プロセスをまたぐ通信は不可でした。そのため、本格的な Pub/Sub を実装するには Kafka などのメッセージブローカーを利用する必要があります。
環境構築
Kafka を利用するのに必要なツールをセットアップ&インストールしていきます。
kafka コンテナ環境
Kafka をローカルで動かすために Docker を利用します。Kafka の開発元である Confluent 社が管理する confluentinc/cp-all-in-one の docker-compose.yml
を使って、ローカルの開発用環境を構築します。
cp-all-in-one の docker-compose.yml を利用することで、Kafka を動作させるために必要な複数のコンポーネントが一括で起動でき、Kafka のフル機能をローカル環境で簡単にセットアップできます。
# docker-compose.yml ファイルダウンロード
curl -O https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.8.0-post/cp-all-in-one/docker-compose.yml
# コンテナ起動
docker compose up
これにより起動するコンテナは以下です。
docker compose ps
---
NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS
broker confluentinc/cp-server:7.8.0 "/etc/confluent/dock…" broker 15 minutes ago Up 6 minutes 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0 "/etc/confluent/dock…" connect 15 minutes ago Up 6 minutes 0.0.0.0:8083->8083/tcp, 9092/tcp
control-center confluentinc/cp-enterprise-control-center:7.8.0 "/etc/confluent/dock…" control-center 15 minutes ago Up 6 minutes 0.0.0.0:9021->9021/tcp
flink-jobmanager cnfldemos/flink-kafka:1.19.1-scala_2.12-java17 "/docker-entrypoint.…" flink-jobmanager 15 minutes ago Up 6 minutes 6123/tcp, 8081/tcp, 0.0.0.0:9081->9081/tcp
flink-sql-client cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17 "/docker-entrypoint.…" flink-sql-client 15 minutes ago Up 6 minutes 6123/tcp, 8081/tcp
flink-taskmanager cnfldemos/flink-kafka:1.19.1-scala_2.12-java17 "/docker-entrypoint.…" flink-taskmanager 15 minutes ago Up 6 minutes 6123/tcp, 8081/tcp
ksql-datagen confluentinc/ksqldb-examples:7.8.0 "bash -c 'echo Waiti…" ksql-datagen 15 minutes ago Up 6 minutes
ksqldb-cli confluentinc/cp-ksqldb-cli:7.8.0 "/bin/sh" ksqldb-cli 15 minutes ago Up 6 minutes
ksqldb-server confluentinc/cp-ksqldb-server:7.8.0 "/etc/confluent/dock…" ksqldb-server 15 minutes ago Up 6 minutes 0.0.0.0:8088->8088/tcp
rest-proxy confluentinc/cp-kafka-rest:7.8.0 "/etc/confluent/dock…" rest-proxy 15 minutes ago Up 6 minutes 0.0.0.0:8082->8082/tcp
schema-registry confluentinc/cp-schema-registry:7.8.0 "/etc/confluent/dock…" schema-registry 15 minutes ago Up 6 minutes 0.0.0.0:8081->8081/tcp
---
コンテナ名 | イメージ | 役割 | アクセス可能な URL |
---|---|---|---|
broker | confluentinc/cp-server:7.8.0 | Kafka のブローカー。メッセージの送受信を担当。 | - |
connect | cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0 | Kafka Connect。データベースやクラウドとの接続を行う。 | - |
control-center | confluentinc/cp-enterprise-control-center:7.8.0 | Kafka を監視・管理する Web UI。トピック管理やメトリクスの可視化が可能。 | http://localhost:9021(Web UI) |
flink-jobmanager | cnfldemos/flink-kafka:1.19.1-scala_2.12-java17 | Apache Flink のジョブ管理。ストリーム処理のジョブを管理。 | - |
flink-sql-client | cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17 | Flink の SQL クライアント。SQL を使ってストリーム処理を実行。 | - |
flink-taskmanager | cnfldemos/flink-kafka:1.19.1-scala_2.12-java17 | Flink のタスク処理。ジョブマネージャーから割り当てられた処理を実行。 | - |
ksql-datagen | confluentinc/ksqldb-examples:7.8.0 | Kafka Streams のデータ生成。Kafka にテストデータを流す。 | - |
ksqldb-cli | confluentinc/cp-ksqldb-cli:7.8.0 | ksqlDB のコマンドラインクライアント。Kafka Streams のクエリを実行。 | - |
ksqldb-server | confluentinc/cp-ksqldb-server:7.8.0 | Kafka Streams 向けの SQL 実行エンジン。Kafka のデータを SQL で処理。 | http://localhost:8088(API) |
rest-proxy | confluentinc/cp-kafka-rest:7.8.0 | Kafka への HTTP API。Kafka に REST API でアクセス可能。 | http://localhost:8082(API) |
schema-registry | confluentinc/cp-schema-registry:7.8.0 | メッセージのスキーマ管理。Avro などのデータスキーマを登録・検証。 | http://localhost:8081(API) |
Kafka クライアント
Watermill の Kafka クライアントを Go にインストールします。
Kafka | Watermill | Event-Driven in Go
go get github.com/ThreeDotsLabs/watermill-kafka/v3
Kafka を使った Watermill の Pub/Sub 実装
Kafka を使った Publisher(メッセージ送信) と Subscriber(メッセージ受信) を 別プロセスで実行できる形 で実装します。
Publisher(メッセージを送信)
pub.go
package main
import (
"fmt"
"log"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
logger := watermill.NewStdLogger(false, false)
// Kafka の Publisher を作成
publisher, err := kafka.NewPublisher(kafka.PublisherConfig{
Brokers: []string{"localhost:9092"},
Marshaler: kafka.DefaultMarshaler{},
}, logger)
if err != nil {
log.Fatal(err)
}
defer publisher.Close()
topic := "example.topic"
// メッセージを 5 回送信
for i := 1; i <= 5; i++ {
msg := message.NewMessage(watermill.NewUUID(), []byte(fmt.Sprintf("Message %d", i)))
if err := publisher.Publish(topic, msg); err != nil {
log.Fatal(err)
}
fmt.Printf("Published: Message %d\n", i)
}
}
- Kafka の
NewPublisher
を使って、Kafka にメッセージを送信するパブリッシャーを作成 message.NewMessage()
で ユニーク ID 付きのメッセージを作成publisher.Publish(topic, msg)
で Kafka にメッセージを送信
Subscriber(メッセージを受信)
sub.go
package main
import (
"context"
"fmt"
"log"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
logger := watermill.NewStdLogger(false, false)
// Kafka の Subscriber を作成
subscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{
Brokers: []string{"localhost:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
ConsumerGroup: "example-group", // ConsumerGroup を指定することで、複数の Subscriber を並列に動作させ負荷分散することができる
}, logger)
if err != nil {
log.Fatal(err)
}
defer subscriber.Close()
topic := "example.topic"
// Kafka からメッセージを購読
messages, err := subscriber.Subscribe(context.Background(), topic)
if err != nil {
log.Fatal(err)
}
consumeMessages(messages)
}
// メッセージを受信して処理する
func consumeMessages(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("Received: %s\n", string(msg.Payload))
msg.Ack()
}
}
- Kafka の
NewSubscriber
を使って、Kafka からメッセージを受信するサブスクライバーを作成 subscriber.Subscribe(context.Background(), topic)
で Kafka の指定したトピックからメッセージを購読consumeMessages(messages)
で 受信したメッセージを処理msg.Ack()
で メッセージの処理が完了したことを Kafka に通知
動作確認
Kafka を起動します。
docker-compose up
これで Kafka のブローカーが localhost:9092
で起動します。
# 初回のみ
cd <YOUR PROJECT ROOT>
go mod tidy
次に、Subscriber を起動します。
go run sub.go
これで Kafka のトピック example.topic
にメッセージが届くのを待機します。
そして、Publisher を実行します。
go run pub.go
pub.go
が Kafka にメッセージを送信、sub.go
でメッセージを受信し、出力されます。
# Publisher (pub.go) の出力
Published: Message 1
Published: Message 2
Published: Message 3
Published: Message 4
Published: Message 5
# Subscriber (sub.go) の出力
Received: Message 1
Received: Message 2
Received: Message 3
Received: Message 4
Received: Message 5
Kafka を使うことで、別プロセス間のメッセージ送受信が可能になり、ローカルでも本格的なイベント駆動アーキテクチャを体験できました。
Kafka の Pub/Sub を学習する際のイメージの持ち方
実際の運用環境では、Publisher(メッセージの送信者) と Subscriber(メッセージの受信者)は異なるサーバーや異なるネットワーク環境に存在することが一般的です。しかし、今回の学習環境ではローカルの Docker コンテナ上で Kafka を起動し、Publisher と Subscriber を同じ環境で動かすことで、Watermill と共に Kafka の基本的な動作を理解することを目的としています。
実際の運用環境との違い
要素 | 今回の学習環境 | 実際の運用環境 |
---|---|---|
Kafka のホスト | ローカルの Docker 内 | クラウド(AWS MSK、Confluent Cloud など)またはオンプレミス |
Publisher(送信者) | ローカルで go run pub.go を実行 | 別のアプリケーションサーバー |
Subscriber(受信者) | ローカルで go run sub.go を実行 | 別のアプリケーションサーバー |
通信方式 | localhost:9092 で接続 | Kafka のブローカーを外部アクセス可能にし、適切な認証方式を使用 |
本番環境では、メッセージの処理に失敗した際のリトライ戦略や、エラーが発生したメッセージを保存するデッドレターキュー(DLQ)の設定も重要になります。
今回の学習では Kafka のメッセージの流れにフォーカスしていますが、実際の運用を考えると、以下のようにイメージすると分かりやすくなります。
- Publisher は Web アプリやマイクロサービス
- 例えば ECサイトでは、「商品が購入された」というイベントを Kafka に送信する注文管理システムが Publisher になる。
go run pub.go
は、その一部をシンプルに再現したもの。
- Kafka はイベントを保持し、複数の Subscriber に配信
- Kafka は中継役ではなく、メッセージを保持して必要なコンシューマーに配信するのが特徴。
- 例えば、「購入イベント」を処理する支払いシステム、在庫管理システム、メール通知システムなど、複数の Subscriber が1つのイベントを並行処理できる。
- これは
go run sub.go
を複数起動することでシミュレーション可能。
- Subscriber は異なるサービスが複数存在する
- 実際の環境では、Kafka のメッセージを受信する Subscriber は異なるアプリケーションやマイクロサービスで動作する。
- 例えば、決済システム(A)と、メール通知システム(B)は、それぞれ Kafka のトピックを購読し、処理内容も異なる。
go run sub.go
を複数プロセスで実行し、それぞれ別の処理をさせることで、この動作を確認できる。
まとめ
- Watermill は Go でイベント駆動型アプリケーションを構築するためのライブラリ
gochannel
を使うとプロセス内でシンプルに Pub/Sub を実装可能- Kafka を利用すると複数プロセス間でスケーラブルなメッセージ配信が可能
- Kafka のローカル環境構築には Docker を活用
publisher
とsubscriber
を分離することで、実際の運用に近い形で動作確認が可能ConsumerGroup
を設定することで、負荷分散やスケーラビリティを向上- 本番環境ではメッセージのリトライ戦略やデッドレターキューの考慮が必要
[Prev] Step 10-4: go-zero