Spark Tasks
A task is the smallest unit of execution in Spark — a single function applied to a single data partition on a single executor core. If a stage has 200 partitions, Spark creates exactly 200 tasks. Your job’s degree of parallelism equals the number of tasks that can run simultaneously across all executor cores.
Task Count and Parallelism
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Task Demo").getOrCreate()df = spark.read.parquet("transactions.parquet")
print(df.rdd.getNumPartitions()) # e.g., 80 → 80 tasks in Stage 1
# After a shuffle, determined by spark.sql.shuffle.partitionsresult = df.groupBy("region").sum("amount")print(spark.conf.get("spark.sql.shuffle.partitions")) # "200" → 200 tasks in Stage 2
# Max concurrent tasks = total executor cores# Example: 10 executors × 4 cores = 40 concurrent tasks# → 200 tasks run in 5 waves (200 / 40 = 5 rounds)Task Lifecycle
Stage submitted ↓TaskScheduler assigns one task per partition ↓Task serialized and sent to executor ↓Executor deserializes task + fetches partition data ↓Executor runs the function on the partition ↓Task writes output: - ShuffleMapTask → local shuffle files (for next stage) - ResultTask → result sent to driver, or written to output storage ↓Core freed — next task assignedData Locality
Spark schedules tasks close to their data to minimize network I/O:
# Data locality levels (Spark prefers local first):# PROCESS_LOCAL → data in the same JVM (in-memory cache)# NODE_LOCAL → data on the same machine (HDFS local block)# RACK_LOCAL → different machine, same network rack# ANY → data must cross the network
spark.conf.set("spark.locality.wait", "3s") # Wait for PROCESS_LOCALspark.conf.set("spark.locality.wait.node", "3s") # Wait for NODE_LOCALspark.conf.set("spark.locality.wait.rack", "3s") # Wait for RACK_LOCALSpeculative Execution
Stragglers — tasks far slower than their peers — hold up the entire stage. Speculative execution duplicates them on other executors; whichever finishes first wins.
spark.conf.set("spark.speculation", "true")spark.conf.set("spark.speculation.multiplier", "1.5") # 1.5× medianspark.conf.set("spark.speculation.quantile", "0.75") # Start after 75% of tasks donespark.conf.set("spark.speculation.minTaskRuntime","60s") # Only speculate tasks > 1 minTask Metrics in Spark UI
In the Spark UI (Stages → click stage → Tasks table):
| Metric | Meaning |
|---|---|
| Duration | Wall-clock time for the task |
| GC Time | JVM GC — high means executor needs more memory |
| Input Size | Bytes read from storage or shuffle |
| Shuffle Write | Bytes written to shuffle files |
| Spill (Disk) | Data that couldn’t fit in memory |
Task Failures and Retries
spark.conf.set("spark.task.maxFailures", "4") # Retry up to 4 times per task
# Common task failure causes:# - OOM: partition too large or memory too small# - Serialization error: UDF contains unserializable object# - Network timeout: executor lost mid-task# - Malformed records: bad input dataDiagnosing Task Problems
from pyspark.sql import functions as F
# Task skew — one task 10× slower than othersdf.rdd.mapPartitionsWithIndex( lambda i, it: [(i, sum(1 for _ in it))]).toDF(["part_id", "rows"]).orderBy(F.col("rows").desc()).show(10)
# Fix skew: salt the key or enable AQEspark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# OOM in tasksspark.conf.set("spark.executor.memory", "8g")spark.conf.set("spark.executor.memoryOverhead","2g")
# Too many small tasks (< 100ms each) → coalescedf.coalesce(50).groupBy("region").sum("amount")