π Analogy
A factory assembly line: one station stamps the part, the next paints it, the next boxes it. Each station does one job and hands the result to the next, all working at the same time on different items. If boxing slows down, the line naturally backs up β nobody piles parts on the floor. Thatβs a pipeline.
The problem
You have a stream of data and several transformations to apply. Doing them in one big loop is simple but serial β and mixes the steps together. A pipeline gives each stage its own goroutine, connected by channels, so stages run concurrently, stay decoupled, and get back-pressure and cancellation for free.
Structure
graph LR
G["generator<br/>goroutine"] -->|chan int| S1["square<br/>goroutine"]
S1 -->|chan int| S2["square<br/>goroutine"]
S2 -->|chan int| M["main<br/>range loop"]
D["done chan"] -.cancels.-> G
D -.cancels.-> S1
D -.cancels.-> S2
Each stage follows the same shape: take an inbound channel, return a new outbound channel, run a goroutine that reads β transforms β sends, and close the output when the input is exhausted.
Idiomatic Go
A generator turns values into a stream; each stage reads one channel and writes the next; main ranges the final channel. The done channel lets everything cancel cleanly. Edit and Run:
package main
import "fmt"
// generator turns a list of ints into a stream.
func generator(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out) // this stage owns out, so it closes out
for _, n := range nums {
select {
case <-done:
return
case out <- n:
}
}
}()
return out
}
// square is a stage: read ints, emit their squares.
func square(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-done:
return
case out <- n * n:
}
}
}()
return out
}
func main() {
done := make(chan struct{})
defer close(done) // cancels every stage on return
// Compose stages: numbers -> square -> square
stream := square(done, square(done, generator(done, 1, 2, 3, 4)))
for n := range stream {
fmt.Println(n) // 1, 16, 81, 256
}
}
Generators & take
A more composable style, popularized by Concurrency in Go, uses a repeatFn that produces an infinite stream and a take that pulls just the first N β both honoring done:
done := make(chan any)
defer close(done)
repeat := repeatFn(done, func() any { return rand.Int() }) // infinite
for n := range take(done, repeat, 10) { // bounded
fmt.Println(n)
}
Because every stage selects on <-done, closing done unwinds even the infinite generator β no leaked goroutines.
πΉ The three rules that make pipelines safe
1. The stage that writes a channel closes it (defer close(out)), never the reader. 2. Every send and receive also selects on <-done (or ctx.Done()), so cancellation canβt deadlock. 3. Let channels carry the back-pressure β unbuffered channels keep a fast stage from racing ahead of a slow one, so memory stays bounded without any manual throttling.
Where it leads
The pipeline is the foundation for the rest of Goβs concurrency toolkit:
- Parallelize a slow stage with Fan-out / Fan-in.
- Bound how many items are in flight with a Worker Pool.
- Replace the
donechannel withcontext.Contextfor deadlines and request scope. - Wrap a stream so it always stops on cancel with the Or-done channel.
Pitfalls
β οΈ Leaks happen when a consumer stops early
If main breaks out of the range loop before the stream is drained, upstream stages can block forever on a send nobody receives β a goroutine leak. The done channel (closed via defer) is what saves you: it signals every stage to abandon its pending send and return. Never start a pipeline stage without a cancellation path.
When to use it β and when not
β Reach for it when
- You transform a stream in distinct steps (read β parse β filter β write) and want them to run concurrently.
- You want natural back-pressure and bounded memory, with stages decoupled from each other.
- You need clean cancellation so partial work shuts down without leaking goroutines.
β Think twice when
- It's a single, cheap transformation β a plain loop is simpler and faster.
- Stages are so tiny that channel overhead dominates the actual work.
- You need strict global ordering after fanning a stage out to many workers.
Related patterns
Distribute work across multiple goroutines (fan-out) and merge their results back into one stream (fan-in).
CONCURRENCY Worker PoolBound concurrency by feeding jobs to a fixed number of long-lived worker goroutines.
CONCURRENCY GeneratorProduce a stream of values from a goroutine over a channel, lazily and on demand.
CONCURRENCY Context & CancellationPropagate cancellation, deadlines, and request-scoped values across API boundaries and goroutine trees with context.Context.
Check your understanding
Score: 0 / 31. In a Go pipeline, who closes a stage's output channel?
Each stage owns its output channel and closes it when done. Closing from the receiver side would panic; ranging over the channel downstream ends cleanly once it's closed.
2. What is the `done` channel for?
Each stage selects on <-done alongside its send/receive. Closing done unblocks them all so they return β preventing goroutines stuck forever on a channel nobody reads.
3. Where does back-pressure come from?
With unbuffered (or small-buffered) channels, a producer can't outrun its consumer β it blocks on send until the consumer receives, keeping memory bounded automatically.