SparkSession
SparkSession is the single unified entry point for all Spark functionality since Spark 2.0. It replaced the fragmented APIs of earlier versions (SparkContext, SQLContext, HiveContext) with one object that handles everything — reading data, running SQL, accessing streaming, and managing cluster configuration.
Creating a SparkSession
from pyspark.sql import SparkSession
# Minimal sessionspark = SparkSession.builder \ .appName("MyApplication") \ .getOrCreate()
# Full production configurationspark = SparkSession.builder \ .appName("DataPipeline") \ .master("yarn") \ .config("spark.executor.memory", "8g") \ .config("spark.executor.cores", "4") \ .config("spark.sql.shuffle.partitions", "200") \ .config("spark.sql.adaptive.enabled", "true") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .enableHiveSupport() \ .getOrCreate()
# Check Spark versionprint(spark.version) # "3.5.1"getOrCreate() Behavior
getOrCreate() is idempotent — if a session already exists in the JVM, it returns it instead of creating a new one. This is critical for notebooks and long-running applications where multiple cells/modules may try to create a session.
Reading Data with SparkSession
# CSVdf = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .csv("s3://bucket/sales.csv")
# Parquet (schema-embedded, no options needed)df = spark.read.parquet("s3://bucket/transactions/")
# JSON (multi-line)df = spark.read \ .option("multiLine", "true") \ .json("s3://bucket/events/*.json")
# Delta Lakedf = spark.read.format("delta").load("s3://bucket/delta/employees/")
# JDBC (relational database)df = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql://host:5432/db") \ .option("dbtable", "public.customers") \ .option("user", "spark_user") \ .option("password", "secret") \ .option("numPartitions", 16) \ .option("partitionColumn", "customer_id") \ .option("lowerBound", 1) \ .option("upperBound", 1000000) \ .load()SQL with SparkSession
# Register a DataFrame as a temporary viewdf.createOrReplaceTempView("sales")
# Run SQLresult = spark.sql(""" SELECT region, product_category, SUM(revenue) AS total_revenue, COUNT(DISTINCT customer_id) AS unique_customers FROM sales WHERE sale_date >= '2025-01-01' GROUP BY region, product_category ORDER BY total_revenue DESC""")result.show(20)
# Global temp view (visible across sessions)df.createOrReplaceGlobalTempView("global_sales")spark.sql("SELECT COUNT(*) FROM global_temp.global_sales").show()
# List all viewsspark.catalog.listTables()Accessing SparkContext
For low-level RDD operations, access SparkContext through the session:
# Get SparkContext from SparkSessionsc = spark.sparkContext
# Now use RDD APIsrdd = sc.parallelize([1, 2, 3, 4, 5])sc.textFile("hdfs://data/logs.txt")
# Access configurationsc.getConf().getAll() # All Spark settingsRuntime Configuration
# Read config valuesspark.conf.get("spark.sql.shuffle.partitions") # "200"spark.conf.get("spark.executor.memory") # "8g"
# Set config at runtime (some settings can't change after session creation)spark.conf.set("spark.sql.shuffle.partitions", "400")spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Useful configurations for 2025 workloadsspark.conf.set("spark.sql.adaptive.enabled", "true") # AQEspark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") # 128 MB per partitionspark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")Multiple Sessions
# Create a secondary session with different config (same SparkContext underneath)spark2 = SparkSession.builder \ .appName("SubSession") \ .config("spark.sql.shuffle.partitions", "50") \ .getOrCreate()
# Each session has its own catalog and configuration namespacespark2.conf.get("spark.sql.shuffle.partitions") # "50"
# Stop a specific session (stops the whole Spark application)spark.stop()Closing Down
# Always stop SparkSession when done (especially in scripts)try: result = spark.read.parquet("data/").count() print(f"Row count: {result}")finally: spark.stop()