Concurrency pattern Β· Intermediate

Pipeline

Process a stream of data through a series of stages connected by channels, where each stage is a goroutine.

Concurrency Intermediate Complete

🏭 Analogy

A factory assembly line: one station stamps the part, the next paints it, the next boxes it. Each station does one job and hands the result to the next, all working at the same time on different items. If boxing slows down, the line naturally backs up β€” nobody piles parts on the floor. That’s a pipeline.

The problem

You have a stream of data and several transformations to apply. Doing them in one big loop is simple but serial β€” and mixes the steps together. A pipeline gives each stage its own goroutine, connected by channels, so stages run concurrently, stay decoupled, and get back-pressure and cancellation for free.

Structure

graph LR
  G["generator<br/>goroutine"] -->|chan int| S1["square<br/>goroutine"]
  S1 -->|chan int| S2["square<br/>goroutine"]
  S2 -->|chan int| M["main<br/>range loop"]
  D["done chan"] -.cancels.-> G
  D -.cancels.-> S1
  D -.cancels.-> S2

Each stage follows the same shape: take an inbound channel, return a new outbound channel, run a goroutine that reads β†’ transforms β†’ sends, and close the output when the input is exhausted.

Idiomatic Go

A generator turns values into a stream; each stage reads one channel and writes the next; main ranges the final channel. The done channel lets everything cancel cleanly. Edit and Run:

package main

import "fmt"

// generator turns a list of ints into a stream.
func generator(done <-chan struct{}, nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out) // this stage owns out, so it closes out
		for _, n := range nums {
			select {
			case <-done:
				return
			case out <- n:
			}
		}
	}()
	return out
}

// square is a stage: read ints, emit their squares.
func square(done <-chan struct{}, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case <-done:
				return
			case out <- n * n:
			}
		}
	}()
	return out
}

func main() {
	done := make(chan struct{})
	defer close(done) // cancels every stage on return

	// Compose stages: numbers -> square -> square
	stream := square(done, square(done, generator(done, 1, 2, 3, 4)))

	for n := range stream {
		fmt.Println(n) // 1, 16, 81, 256
	}
}

Generators & take

A more composable style, popularized by Concurrency in Go, uses a repeatFn that produces an infinite stream and a take that pulls just the first N β€” both honoring done:

done := make(chan any)
defer close(done)

repeat := repeatFn(done, func() any { return rand.Int() }) // infinite
for n := range take(done, repeat, 10) {                    // bounded
	fmt.Println(n)
}

Because every stage selects on <-done, closing done unwinds even the infinite generator β€” no leaked goroutines.

🐹 The three rules that make pipelines safe

1. The stage that writes a channel closes it (defer close(out)), never the reader. 2. Every send and receive also selects on <-done (or ctx.Done()), so cancellation can’t deadlock. 3. Let channels carry the back-pressure β€” unbuffered channels keep a fast stage from racing ahead of a slow one, so memory stays bounded without any manual throttling.

Where it leads

The pipeline is the foundation for the rest of Go’s concurrency toolkit:

Pitfalls

⚠️ Leaks happen when a consumer stops early

If main breaks out of the range loop before the stream is drained, upstream stages can block forever on a send nobody receives β€” a goroutine leak. The done channel (closed via defer) is what saves you: it signals every stage to abandon its pending send and return. Never start a pipeline stage without a cancellation path.

When to use it β€” and when not

βœ… Reach for it when

  • You transform a stream in distinct steps (read β†’ parse β†’ filter β†’ write) and want them to run concurrently.
  • You want natural back-pressure and bounded memory, with stages decoupled from each other.
  • You need clean cancellation so partial work shuts down without leaking goroutines.

β›” Think twice when

  • It's a single, cheap transformation β€” a plain loop is simpler and faster.
  • Stages are so tiny that channel overhead dominates the actual work.
  • You need strict global ordering after fanning a stage out to many workers.

Check your understanding

Score: 0 / 3

1. In a Go pipeline, who closes a stage's output channel?

Each stage owns its output channel and closes it when done. Closing from the receiver side would panic; ranging over the channel downstream ends cleanly once it's closed.

2. What is the `done` channel for?

Each stage selects on <-done alongside its send/receive. Closing done unblocks them all so they return β€” preventing goroutines stuck forever on a channel nobody reads.

3. Where does back-pressure come from?

With unbuffered (or small-buffered) channels, a producer can't outrun its consumer β€” it blocks on send until the consumer receives, keeping memory bounded automatically.