Spark DataFrames
A DataFrame is Spark’s primary high-level API for structured data — a distributed table with named columns and a schema. Unlike RDDs, DataFrames benefit from the Catalyst optimizer, which generates efficient execution plans from your code regardless of the programming language.
Creating a DataFrame
from pyspark.sql import SparkSession, functions as Ffrom pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.appName("DataFrame Guide").getOrCreate()
# From a Python listdata = [("Alice", "Engineering", 95000), ("Bob", "Marketing", 72000)]df = spark.createDataFrame(data, ["name", "department", "salary"])
# With explicit schemaschema = StructType([ StructField("name", StringType(), nullable=False), StructField("department", StringType()), StructField("salary", IntegerType()),])df = spark.createDataFrame(data, schema)
# From filesdf = spark.read.option("header", True).csv("employees.csv")df = spark.read.parquet("s3://bucket/transactions/")df = spark.read.format("delta").load("s3://bucket/delta/")Schema and Inspection
df.printSchema()df.show(5, truncate=False)df.dtypes # [('name', 'string'), ('salary', 'int')]df.columns # ['name', 'department', 'salary']df.count() # row countdf.describe().show() # summary statisticsCommon Transformations
# Column selectiondf.select("name", "salary")df.select(F.col("salary") * 1.1)
# Filteringdf.filter(F.col("salary") > 80000)df.where("department = 'Engineering'")
# Adding / modifying columnsdf.withColumn("bonus", F.col("salary") * 0.1)df.withColumnRenamed("salary", "annual_salary")
# Sortingdf.orderBy(F.col("salary").desc())
# Deduplicationdf.distinct()df.dropDuplicates(["department"])Aggregations
df.groupBy("department").agg( F.count("*").alias("headcount"), F.avg("salary").alias("avg_salary"), F.max("salary").alias("max_salary"),)Window Functions
from pyspark.sql.window import Window
window = Window.partitionBy("department").orderBy(F.col("salary").desc())
df.withColumn("rank", F.rank().over(window)) \ .withColumn("running_sum", F.sum("salary").over(window)) \ .show()Joins
# Standard join types: inner, left, right, full, semi, antidf.join(df2, "department", "inner")df.join(df2, df.department == df2.dept_name, "left")
# Broadcast join (small table)from pyspark.sql.functions import broadcastdf.join(broadcast(small_df), "department")Writing Data
df.write.mode("overwrite").parquet("s3://bucket/output/")df.write.mode("append").format("delta").save("s3://bucket/delta/")df.write.partitionBy("year", "month").mode("overwrite").parquet("output/")Performance Tips
# Cache before multiple usesdf.cache()
# Avoid UDFs when built-in functions exist# BAD: df.withColumn("upper", F.udf(str.upper)(F.col("name")))# GOOD: df.withColumn("upper", F.upper(F.col("name")))
# Enable AQE for automatic optimizationspark.conf.set("spark.sql.adaptive.enabled", "true")