Skip to content

Spark

Spark is an Apache technology for distributed computing, even on weak hardware. It uses JVM and Scala. PySpark is the Python interface to it.

Uses map-reduce: tasks are split, processed in parallel, and results are combined. Designed for clusters and large datasets.

PySpark allows two main options for working with data:

  • SQL queries (via temporary views).
  • DataFrame API (Pythonic, lazy evaluation).

Comparison to other solutions

Spark has significant performance overhead when starting the runtime and splitting the main task. Of course, the best results are in large clusters.

Simple rules:

  • Polars are better than pandas (multithreaded, lazy evaluation).
  • Polars and DuckDB are suitable for single machine workloads—data size up to a few gigabytes max.
  • PySpark excels in cluster infrastructure with terabytes of data.
  • Single machine PySpark is suitable for testing, but not practical for production.

Data process technology comparasion

SQL

Create a temporary view for SQL queries:

df: DataFrame = ...
# We need to create a name to query in SQL
# this is only a shortcut so the operation is quick
df.createOrReplaceTempView("my_table_name")
sql = """
SELECT * FROM my_table_name WHERE age > 30;
"""
result = df.sparkSession.sql(sql)
DESCRIBE HISTORY my_table_name; -- show delta log

DROP TABLE IF EXISTS my_table_name;

SELECT count_if(email IS NULL) FROM users_dirty;
SELECT count(*) FROM users_dirty WHERE email IS NULL;

SELECT DISTINCT(*) FROM users_dirty;
usersDF.selectExpr("count_if(email IS NULL)")
usersDF.where(col("email").isNull()).count()

usersDF.distinct().count()

Useful SQL commands:

DESCRIBE HISTORY my_table_name; -- show delta log

DROP TABLE IF EXISTS my_table_name;

SELECT count_if(email IS NULL) FROM users_dirty;
SELECT count(*) FROM users_dirty WHERE email IS NULL;

SELECT DISTINCT(*) FROM users_dirty;

Python alternatives:

usersDF.selectExpr("count_if(email IS NULL)")
usersDF.where(col("email").isNull()).count()

usersDF.distinct().count()

Usefull functions:

date_format(col, "HH:mm:ss")
regexp_extract(email, "(?<=@).+", 0)
CAST col AS STRING

Ingestion

Basic info:

SELECT current_catalog(), current_schema(); -- catalog/schema info
LIST <path>;                    -- list files
SELECT * FROM parquet.`<path>`; -- read Parquet file directly

Batch

Tradiotinaly CTAS (CREATE TABLE AS, spark.read.load()) aproach is used for ingestion. It reads all rows (entire data) each time. It is not efficient for large data that changes slowly.

CREATE TABLE new_table AS
SELECT *
FROM read_files(
    <path_to_files>,
    format => '<file_type>',    -- JSON, CSV, XML, TEXT, BINARYFILE, PARQUET, AVRO
    <other_format_specific_options>
);

Incremental batch

Only new data are ingested. For this is used:

  • COPY INTO
  • spark.readStream (Auto loader with time trigger)
  • Declarative pipeline: CREATE OR REFRESH STREAMING TABLE
CREATE TABLE new_table_2;   -- Create empty table

COPY INTO new_table_2
    FROM '<dir_path>'
    FILEFORMAT = <file_type>
    FORMAT_OPTIONS(<options>)
    COPY_OPTIONS(<options>)

--- better approach: ---

CREATE OR REFRESH STREAMING TABLE new_table_3
SCHEDULE EVERY 1 WEEK
AS SELECT *
FROM STREAM read_files(...);-- e.g., 'mergeSchema' = 'true'

Automatically skip already ingested files based on checkpointing. Operation is idempotent.

Autoloader is more modern than COPY INTO.

User defined functions (UDFs)

CREATE [OR REPLACE] FUNCTION function_name ( [ parameter_name data_type [, ...] ] )
RETURNS data_type
RETURN { expression | query }
CREATE FUNCTION plus_one(value INTEGER)
RETURNS INTEGER
RETURN value +1;

Python

DataFrames are lazy, so every operation is stored in a directed acyclic graph (DAG) and evaluated only with operations like: save, show, display etc.

Load and save

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create SparkSession
spark = SparkSession.builder \
            .appName('SparkByExamples.com') \
            .getOrCreate()

# Create dataframe
data=[
    ["1","2020-02-01"],
    ["2","2019-03-01"],
    ["3","2021-03-01"]
]
df=spark.createDataFrame(data, ["id","input"])
df.printSchema()
df.count()
df.show(10, truncate=False)

# Load dataframe
df1: DataFrame = spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load("path-to-file.csv")
# overwrite is equal to DROP TABLE IF EXISTS
df1.write.format("delta").mode("overwrite").save("path-to-delta-dir")
df1.write.mode("overwrite").saveAsTable("table_name")

df2 = spark.table("table_name") # load table

Select

from pyspark.sql.functions import col, split, to_timestamp, when

df.select(col("col1_name"))
df.where(col("age") > 30)
df.filter((df.age > 25) & (df.department == "Engineering"))
df.filter(col("department").like("MARKETING"))
df.filter(col("name").startswith("A"))
df.withColumn("price_sum", col("amount") * col("price_per_item"))
df.withColumn("start_at", to_timestamp(col("start_at_str")))
df.withColumnRenamed("name", "full_name")

# Select columns dynamically
columns_to_select = ["name", "department"]
df.select(*columns_to_select)

df.withColumn(
    "salary_category",
    when(
        col("salary") < 60000, "Low"
    ).when(
        (col("salary") >= 60000) & (col("salary") < 90000), "Medium"
    ).otherwise("High")
)

# Add multiple columns at once
df = df.withColumns({
    "bonus": col("salary") * 0.1,
    "net_salary": col("salary") - (col("salary") * 0.2)
})
df = df.drop('column1', 'column2', 'column3')

Arrays

SELECT exists(array(1, 2, 3), x -> x % 2 == 0); -- Result: true, check if any even number exists
SELECT filter(array(1, 2, 3), x -> x % 2 == 1); -- Result: [1,3], keep odd numbers
SELECT forall(array(1, 2, 3), x -> x % 2 == 0); -- Result: false, Tests whether func holds for all elements in the array.
SELECT transform(array(1, 2, 3), x -> x * x); -- Result: [1,4,9], square each element

Joins

df3.join()
df3.join(df, on=df1["col1_id"] == df2["col_link_id"], how="inner")

Testing

from pyspark.testing.utils import assertDataFrameEqual, assertSchemaEqual

# actual, expected
assertSchemaEqual(df1.schema, df2.schema)
assertDataFrameEqual(df1, df2)

Miscellaneous

The OPTIMIZE command compacts small files into larger ones for better access patterns. Z-Order indexing further sorts data based on specific columns to improve query pruning. Both operations require substantial computational resources for scanning and writing data. Therefore, compute-optimized resources provide the necessary CPU power and parallelism to efficiently process these tasks. While storage and memory are important, the main bottleneck during optimization is CPU-intensive compute operations. Operation is idempotent.

OPTIMIZE table_name [FULL] [WHERE predicate] [ZORDER BY (col_name1 [, ...] ) ]

Running the VACUUM command on a Delta table deletes the unused data files older than a specified data retention period. As a result, you lose the ability to time travel back to any version older than that retention threshold.

VACUUM table_name { { FULL | LITE } | DRY RUN } [...]

The retention period is fixed at 7 days.

JSON

There are 3 options how to handle JSON in column:

  1. Store as a string, it is easy but not efficient. Query with path syntax: SELECT json_col:address:city FROM table
  2. Use Struct type, better for fixed schema. Derive schema: SELECT schema_of_json('sample-json-string') Convert JSON to struct: SELECT from_json(json_col, 'json-struct-schema') AS struct_column FROM table
  3. Use Variant type, it combinate advantages of both. Store any JSON structure, flexible schema. Parse: parse_json( jsonStr ) Query with path syntax: SELECT variant_col:address:city FROM table