📻 Analogy
A radio station broadcasts on a frequency. Anyone can tune in or switch off at any time, and the station has no idea who’s listening — it just transmits. Subscribers come and go freely; the publisher is blissfully unaware. That’s Pub/Sub.
The problem
Several parts of your program each need to react to the same events, and which parts are listening changes at runtime. Wiring producers directly to consumers couples them tightly. A broker sits in the middle: producers Publish, consumers Subscribe to get their own channel, and the broker fans each event out to all current subscribers.
Structure
graph LR
P["publisher"] -->|Publish| B["Broker<br/>map of subscriber chans"]
B --> S1["subscriber 1"]
B --> S2["subscriber 2"]
B --> S3["subscriber 3"]
Idiomatic Go
A broker is a mutex-guarded map of subscriber channels. Publish does a non-blocking send to each, so a slow subscriber can’t stall the rest. Edit and Run:
package main
import (
"fmt"
"sync"
)
type Broker struct {
mu sync.RWMutex
subs map[chan string]struct{}
}
func NewBroker() *Broker { return &Broker{subs: make(map[chan string]struct{})} }
func (b *Broker) Subscribe() chan string {
ch := make(chan string, 4) // small buffer absorbs bursts
b.mu.Lock()
b.subs[ch] = struct{}{}
b.mu.Unlock()
return ch
}
func (b *Broker) Publish(msg string) {
b.mu.RLock()
defer b.mu.RUnlock()
for ch := range b.subs {
select {
case ch <- msg: // deliver
default: // subscriber is slow — drop rather than block everyone
}
}
}
func (b *Broker) closeAll() {
b.mu.Lock()
defer b.mu.Unlock()
for ch := range b.subs {
delete(b.subs, ch)
close(ch) // ends each subscriber's range loop
}
}
func main() {
b := NewBroker()
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
ch := b.Subscribe()
wg.Add(1)
go func(id int, ch chan string) {
defer wg.Done()
for msg := range ch {
fmt.Printf("subscriber %d got %q\n", id, msg)
}
}(i, ch)
}
b.Publish("hello")
b.Publish("world")
b.closeAll() // unsubscribe everyone and stop their goroutines
wg.Wait()
}
🐹 The concurrent cousin of Observer
Observer notifies dependents in-process, often synchronously. Pub/Sub generalizes it: a broker decouples publishers from a dynamic set of subscribers, each running in its own goroutine with its own channel. The two key choices are back-pressure (drop, block, or buffer for slow subscribers) and lifecycle (closing channels on unsubscribe so goroutines exit). For cross-process or durable delivery, swap this in-process broker for NATS, Kafka, or Redis.
Pitfalls
⚠️ Slow subscribers force a hard choice
With the non-blocking default above, a subscriber that can’t keep up simply loses messages. The alternatives — block the publisher, or grow an unbounded buffer — trade liveness for memory. There’s no free lunch: decide explicitly whether your system prefers to drop, slow down, or buffer, and document it. Also remember to unsubscribe, or dead subscribers leak goroutines and channels forever.
When to use it — and when not
✅ Reach for it when
- Several independent consumers each need every event, and they come and go at runtime.
- You want producers and consumers fully decoupled — neither knows the other.
- An in-process event bus fits (notifications, cache invalidation, UI updates).
⛔ Think twice when
- There's a single consumer — a plain channel is enough.
- You need delivery guarantees, persistence, or cross-process fan-out — use a real broker (NATS, Kafka, Redis).
- Strict global ordering across subscribers is required.
Related patterns
Define a one-to-many dependency so that when one object changes state, all its dependents are notified automatically.
CONCURRENCY Fan-out / Fan-inDistribute work across multiple goroutines (fan-out) and merge their results back into one stream (fan-in).
CONCURRENCY Context & CancellationPropagate cancellation, deadlines, and request-scoped values across API boundaries and goroutine trees with context.Context.
Check your understanding
Score: 0 / 31. How does Pub/Sub differ from Observer?
Observer is the in-object, usually synchronous form; Pub/Sub adds a broker between producers and a dynamic set of subscribers, and in Go is naturally built on channels and goroutines.
2. Why does the broker use select-with-default (or buffered channels) when publishing?
A blocking send to a full subscriber would stall the whole Publish loop. A non-blocking send (drop, or rely on a buffer) keeps a slow consumer from freezing everyone.
3. What ends a subscriber's goroutine cleanly?
Closing the subscriber's channel (on unsubscribe/shutdown) terminates its range loop, letting the goroutine exit without leaking.