Dataflow Explained: How Apache Beam Pipelines Actually Execute on GCP
Writing an Apache Beam pipeline is straightforward. Understanding why it runs slowly, costs more than expected, or produces incorrect streaming results requires understanding what Dataflow does when you press run. This guide covers the internal execution model — how workers are allocated, how shuffling works, what watermarks actually mean, and how to read the monitoring dashboard to find bottlenecks.
What Happens When You Submit a Job
When you submit a Beam pipeline with DataflowRunner, this sequence occurs:
1. Local machine ├── Beam SDK compiles pipeline graph (DAG of transforms) ├── Optimizer analyzes and fuses compatible steps └── Job submitted to Dataflow service API
2. Dataflow service ├── Validates job parameters and resource limits ├── Creates job graph: splits pipeline into stages └── Allocates initial worker pool (VMs)
3. Workers (Compute Engine VMs) ├── Download pipeline code / container image ├── Connect to Dataflow service for work item distribution ├── Execute assigned work items in parallel └── Report progress and output to Dataflow service
4. Shuffle service (separate from workers) └── Handles GroupByKey and CoGroupByKey data movement between pipeline stagesThe workers never communicate directly with each other. All coordination — work distribution, shuffle, output — goes through Dataflow service APIs and the Shuffle service.
Pipeline Fusion: Why Some Steps Disappear
Dataflow’s optimizer fuses compatible transforms into a single “stage” to minimize data serialization and network hops. A map followed immediately by a filter followed by another map might become a single fused step:
Original pipeline graph: ReadFromText → ParseJSON → FilterRecords → FormatOutput → WriteToBigQuery
After fusion: ReadFromText ↓ [ParseJSON + FilterRecords + FormatOutput] ← single stage, runs in-memory ↓ WriteToBigQueryFusion breaks at boundaries where data must be shuffled across workers: GroupByKey, CoGroupByKey, and windowing operations that require data from multiple workers to be combined. Understanding fusion helps you predict where Dataflow adds serialization overhead.
You can observe fusion by examining the execution graph in the Dataflow monitoring console. Each box in the visualization is a stage; fused transforms appear as a single stage.
Worker Allocation and Autoscaling
Dataflow starts a batch job with a default number of workers (or the number you specify) and adjusts the count based on throughput and remaining work.
Job progress over time: Workers: 5 5 5 8 8 8 12 12 8 5 3 (autoscaling up then down) Progress: ──────────────────────────────────► 100%
Autoscaler considers: - Current throughput (elements/second per worker) - Estimated remaining work - Worker utilization (are workers idle or fully loaded?) - Time remaining on committed deadlineFor streaming jobs, autoscaling responds to Pub/Sub backlog — if unprocessed messages accumulate, Dataflow adds workers; if the backlog clears, it removes workers.
Control autoscaling with these flags:
# Batch: set min/max workersgcloud dataflow flex-template run my-batch-job \ --parameters=num_workers=5,max_num_workers=50
# Streaming: enable autoscaling (default) or fix worker countgcloud dataflow flex-template run my-stream-job \ --parameters=autoscaling_algorithm=THROUGHPUT_BASED,max_num_workers=20The Shuffle Service: Moving Data Between Stages
When a GroupByKey or CoGroupByKey occurs, Dataflow must redistribute all data so that all records with the same key end up on the same worker. This redistribution is called a shuffle.
In standard mode, shuffle happens on disk within workers — slow and memory-intensive. The Shuffle Service moves this to a managed, off-worker service:
Without Shuffle Service: Worker 1: emits (key_A: val1), (key_B: val2) Worker 2: emits (key_A: val3), (key_C: val4)
Shuffle: workers communicate directly (slow, uses worker disk/RAM)
Worker 1 now has: all key_A records Worker 2 now has: all key_B, key_C records
With Shuffle Service: Workers emit to managed shuffle storage Dataflow service handles sorting and routing Workers pull their assigned key ranges (Workers use less RAM, faster, more predictable cost)Enable the Shuffle Service:
--experiments=shuffle_mode=serviceThis is enabled by default for most job types. You’ll see lower worker memory usage and faster shuffle operations in the monitoring console.
Watermarks: The Heart of Streaming Correctness
In a streaming pipeline, data arrives at different times due to network delays. An event that happened at 10:00 AM might arrive at 10:03 AM. A watermark is a bound that says: “I am confident that all events before time T have arrived.”
Event time (when event actually occurred): ──────────────────────────────────────────────────► 10:00 10:01 10:02 10:03 10:04 10:05
Processing time (when events arrive at Dataflow): ──────────────────────────────────────────────────► 10:00 10:01 10:02 10:03 10:04 10:05
Actual arrival pattern: Event at 10:00 arrives at 10:01 (1 min late) Event at 10:00 arrives at 10:03 (3 min late — network issue) Event at 10:01 arrives at 10:02 (1 min late)
Watermark at processing time 10:03: "I'm confident all events before 10:01 have arrived" Windows ending before 10:01 can now be closed and emittedWhen the watermark passes a window boundary, Dataflow closes that window and fires any triggers. Events that arrive after the watermark for their window are “late data.”
Watermark stuck is a common production issue: the watermark stops advancing, windows never close, and data accumulates. Causes:
- A single slow or stuck worker holding back the watermark
- One Pub/Sub subscription with a very old unacknowledged message
- Incorrect timestamp extraction — events with very old timestamps drag the watermark backward
Monitor watermark progress in the Dataflow console under the “Timeline” tab for each streaming pipeline stage.
Reading the Dataflow Monitoring Console
Job graph view: Each box = a pipeline stage
Metrics shown per stage: Elements added (input throughput) Elements output (output throughput) Wall time (cumulative CPU time, not wall clock)
A stage with high "Elements added" but low "Elements output": → Data is being filtered or aggregated (expected) → Or a transform is slow/stuck (investigate with logs)
System Lag (streaming jobs): Time difference between when a message was published to Pub/Sub and when Dataflow processed it.
Low system lag (< 1 minute): pipeline keeping up Growing system lag: pipeline falling behind, may need more workers
Worker utilization: CPU usage across workers Near 100%: add workers (increase max_num_workers) Near 0%: pipeline is I/O bound or shuffle bound, not CPU boundCommon Performance Issues and Fixes
Hot keys
When one key receives dramatically more data than others, a single worker handles all that data — becoming a bottleneck.
# Detect hot keys by counting per key before GroupByKey( records | "Map to key" >> beam.Map(lambda r: (r["region"], r)) # If key="US" gets 90% of all traffic, the "US" worker is overloaded | "Group" >> beam.GroupByKey())
# Fix: use AggregateByKey instead of GroupByKey when possible# Or introduce a composite key: region + random suffix| "Add suffix" >> beam.Map(lambda r: (f"{r['region']}_{random.randint(0,9)}", r))| "Group" >> beam.GroupByKey()# Then re-group by original region to merge the splitsExcessive serialization
Large objects passing through many transforms incur serialization overhead.
# Inefficient: passes entire order object through many transformsrecords | beam.Map(lambda o: process_order(o))
# Better: project only needed fields earlyrecords | beam.Map(lambda o: {"order_id": o["id"], "amount": o["amount"]}) | beam.Map(lambda r: ...)Slow ParDo with I/O inside
# Bad: opens a DB connection per elementclass LookupCustomer(beam.DoFn): def process(self, element): conn = open_db_connection() # called once per element! ...
# Good: use setup() and teardown()class LookupCustomer(beam.DoFn): def setup(self): self.conn = open_db_connection() # called once per worker
def teardown(self): self.conn.close()
def process(self, element): # reuse self.conn ...Streaming vs Batch: The Same Code, Different Behavior
# This pipeline runs as batch when the input is bounded (file)# and as streaming when unbounded (Pub/Sub)options = PipelineOptions(streaming=False) # batch
# Switch to streaming by changing the source and flagoptions.view_as(StandardOptions).streaming = Truesource = beam.io.ReadFromPubSub(topic="...") # unbounded sourceThe same transform code runs in both modes. Windows behave differently: in batch, a fixed window fires once when all data is processed; in streaming, the same window fires when the watermark passes the boundary.
Summary
Dataflow’s execution model — job graph compilation, fusion, worker allocation, Shuffle Service, watermarks — explains both its performance characteristics and its failure modes. Fusion reduces overhead within stages. The Shuffle Service keeps workers lean during GroupByKey operations. Watermarks determine when streaming windows close and when results are emitted. Autoscaling adjusts capacity in response to throughput and backlog. The monitoring console’s stage-level metrics reveal where a pipeline is bottlenecked. Hot keys, excessive serialization, and per-element I/O connections are the most common sources of underperformance. Understanding these internals makes the difference between a pipeline that just runs and one that runs efficiently at production scale.