Spark
Spark is an Apache technology for distributed computing, even on weak hardware. It uses JVM and Scala. PySpark is the Python interface to it.
Uses map-reduce: tasks are split, processed in parallel, and results are combined. Designed for clusters and large datasets.
PySpark allows two main options for working with data:
- SQL queries (via temporary views).
- DataFrame API (Pythonic, lazy evaluation).
Comparison to other solutions
Spark has significant performance overhead when starting the runtime and splitting the main task. Of course, the best results are in large clusters.
Simple rules:
- Polars are better than pandas (multithreaded, lazy evaluation).
- Polars and DuckDB are suitable for single machine workloads—data size up to a few gigabytes max.
- PySpark excels in cluster infrastructure with terabytes of data.
- Single machine PySpark is suitable for testing, but not practical for production.

SQL
Create a temporary view for SQL queries:
df: DataFrame = ...
# We need to create a name to query in SQL
# this is only a shortcut so the operation is quick
df.createOrReplaceTempView("my_table_name")
sql = """
SELECT * FROM my_table_name WHERE age > 30;
"""
result = df.sparkSession.sql(sql)
Useful SQL commands:
DESCRIBE HISTORY my_table_name; -- show delta log
DROP TABLE IF EXISTS my_table_name;
SELECT count_if(email IS NULL) FROM users_dirty;
SELECT count(*) FROM users_dirty WHERE email IS NULL;
SELECT DISTINCT(*) FROM users_dirty;
Python alternatives:
usersDF.selectExpr("count_if(email IS NULL)")
usersDF.where(col("email").isNull()).count()
usersDF.distinct().count()
Usefull functions:
Ingestion
Basic info:
SELECT current_catalog(), current_schema(); -- catalog/schema info
LIST <path>; -- list files
SELECT * FROM parquet.`<path>`; -- read Parquet file directly
Batch
Tradiotinaly CTAS (CREATE TABLE AS, spark.read.load()) aproach is used for ingestion. It reads all rows (entire data) each time. It is not efficient for large data that changes slowly.
CREATE TABLE new_table AS
SELECT *
FROM read_files(
<path_to_files>,
format => '<file_type>', -- JSON, CSV, XML, TEXT, BINARYFILE, PARQUET, AVRO
<other_format_specific_options>
);
Incremental batch
Only new data are ingested. For this is used:
COPY INTO- spark.readStream (Auto loader with time trigger)
- Declarative pipeline:
CREATE OR REFRESH STREAMING TABLE
CREATE TABLE new_table_2; -- Create empty table
COPY INTO new_table_2
FROM '<dir_path>'
FILEFORMAT = <file_type>
FORMAT_OPTIONS(<options>)
COPY_OPTIONS(<options>)
--- better approach: ---
CREATE OR REFRESH STREAMING TABLE new_table_3
SCHEDULE EVERY 1 WEEK
AS SELECT *
FROM STREAM read_files(...);-- e.g., 'mergeSchema' = 'true'
Automatically skip already ingested files based on checkpointing. Operation is idempotent.
Autoloader is more modern than COPY INTO.
User defined functions (UDFs)
CREATE [OR REPLACE] FUNCTION function_name ( [ parameter_name data_type [, ...] ] )
RETURNS data_type
RETURN { expression | query }
Python
DataFrames are lazy, so every operation is stored in a directed acyclic graph (DAG) and evaluated only with operations like: save, show, display etc.
Load and save
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Create SparkSession
spark = SparkSession.builder \
.appName('SparkByExamples.com') \
.getOrCreate()
# Create dataframe
data=[
["1","2020-02-01"],
["2","2019-03-01"],
["3","2021-03-01"]
]
df=spark.createDataFrame(data, ["id","input"])
df.printSchema()
df.count()
df.show(10, truncate=False)
# Load dataframe
df1: DataFrame = spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("path-to-file.csv")
# overwrite is equal to DROP TABLE IF EXISTS
df1.write.format("delta").mode("overwrite").save("path-to-delta-dir")
df1.write.mode("overwrite").saveAsTable("table_name")
df2 = spark.table("table_name") # load table
Select
from pyspark.sql.functions import col, split, to_timestamp, when
df.select(col("col1_name"))
df.where(col("age") > 30)
df.filter((df.age > 25) & (df.department == "Engineering"))
df.filter(col("department").like("MARKETING"))
df.filter(col("name").startswith("A"))
df.withColumn("price_sum", col("amount") * col("price_per_item"))
df.withColumn("start_at", to_timestamp(col("start_at_str")))
df.withColumnRenamed("name", "full_name")
# Select columns dynamically
columns_to_select = ["name", "department"]
df.select(*columns_to_select)
df.withColumn(
"salary_category",
when(
col("salary") < 60000, "Low"
).when(
(col("salary") >= 60000) & (col("salary") < 90000), "Medium"
).otherwise("High")
)
# Add multiple columns at once
df = df.withColumns({
"bonus": col("salary") * 0.1,
"net_salary": col("salary") - (col("salary") * 0.2)
})
df = df.drop('column1', 'column2', 'column3')
Arrays
SELECT exists(array(1, 2, 3), x -> x % 2 == 0); -- Result: true, check if any even number exists
SELECT filter(array(1, 2, 3), x -> x % 2 == 1); -- Result: [1,3], keep odd numbers
SELECT forall(array(1, 2, 3), x -> x % 2 == 0); -- Result: false, Tests whether func holds for all elements in the array.
SELECT transform(array(1, 2, 3), x -> x * x); -- Result: [1,4,9], square each element
Joins
Testing
from pyspark.testing.utils import assertDataFrameEqual, assertSchemaEqual
# actual, expected
assertSchemaEqual(df1.schema, df2.schema)
assertDataFrameEqual(df1, df2)
Miscellaneous
The OPTIMIZE command compacts small files into larger ones for better access patterns. Z-Order indexing further sorts data based on specific columns to improve query pruning. Both operations require substantial computational resources for scanning and writing data. Therefore, compute-optimized resources provide the necessary CPU power and parallelism to efficiently process these tasks. While storage and memory are important, the main bottleneck during optimization is CPU-intensive compute operations. Operation is idempotent.
Running the VACUUM command on a Delta table deletes the unused data files older than a specified data retention period. As a result, you lose the ability to time travel back to any version older than that retention threshold.
The retention period is fixed at 7 days.
JSON
There are 3 options how to handle JSON in column:
- Store as a string, it is easy but not efficient.
Query with path syntax:
SELECT json_col:address:city FROM table - Use
Structtype, better for fixed schema. Derive schema:SELECT schema_of_json('sample-json-string')Convert JSON to struct:SELECT from_json(json_col, 'json-struct-schema') AS struct_column FROM table - Use
Varianttype, it combinate advantages of both. Store any JSON structure, flexible schema. Parse:parse_json( jsonStr )Query with path syntax:SELECT variant_col:address:city FROM table