0:00
/
0:00
Transcript

Apache Spark - Joins and Optimizing them - Part 1

As Part 1 of this Blog series on Joins & Optimizing them in Apache Spark, we will be discussing more about these topics:

- Broadcast Hash Join

- Shuffle Hash Join

- Sort Merge Join

- Repartitioning

- Bucketing

Broadcast Hash Join

A Broadcast Hash Join is a highly efficient join strategy in Spark where the smaller DataFrame (or table) is "broadcast" to all the executors in the cluster. This eliminates the need for shuffling the larger DataFrame, significantly reducing network traffic and improving performance. It's ideal when one of the DataFrames involved in the join is considerably smaller than the other.

Real-world example:

Imagine you have a large dataset of customer orders and a small lookup table of product information. To enrich the order data with product details, you can use a broadcast hash join.

```python

# Standard Approach (Shuffle Hash Join):



from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Sample DataFrames

orders_df = spark.createDataFrame([(1, 101, 2),(2, 102, 1),(3, 101, 3),],["order_id", "product_id", "quantity"],)



products_df = spark.createDataFrame([(101, "Laptop", 1200),(102, "Mouse", 25),(103, "Keyboard", 75),],["product_id", "name", "price"],)



# Standard Join

result_df = orders_df.join(products_df, on="product_id", how="inner")

result_df.show()



# Efficient Approach (Broadcast Hash Join):



from pyspark.sql.functions import broadcast

# Broadcast the smaller DataFrame



result_df = orders_df.join(broadcast(products_df), on="product_id", how="inner")


result_df.show()

In the efficient approach, broadcast(products_df) explicitly instructs Spark to use a broadcast hash join. This can lead to a significant performance improvement, especially when products_df is much smaller than orders_df.

When to use

- One DataFrame is significantly smaller than the other (typically less than 100MB).

- The smaller DataFrame can fit in the memory of each executor.

Shuffle Hash Join

This is the default join strategy in Spark. Both tables are shuffled and partitioned based on the join key, so matching rows end up on the same executor. It's like having everyone with the same birthday gather in the same corner of the room.

When to use it

- Both tables are large and won't fit in executor memory.

- You don't explicitly specify another join strategy.

Real-world example

Imagine joining two large tables of user activity (user_activity_1 and user_activity_2) to analyze user behavior over time.

Standard Approach (Shuffle Hash Join - This is the default)

```python

user_activity_1 = spark.read.csv("user_activity_1.csv", header=True, inferSchema=True)

user_activity_2 = spark.read.csv("user_activity_2.csv", header=True, inferSchema=True)

user_activity_1.join(user_activity_2, "user_id", "inner").show()



```

Efficient Approach

- Optimize Shuffle: Consider increasing spark.sql.shuffle.partitions if you have many executors to avoid large partitions.

- Data Locality: Ensure your data is well-partitioned to minimize shuffling.

Sort Merge Join

Both tables are sorted based on the join key. Then, Spark efficiently merges the sorted data to find matching rows. Imagine two lines of people sorted by height; it's easy to pair them up!

When to use it

- Joining large datasets where shuffle hash join might be inefficient.

- Data is already sorted or can be sorted efficiently.

Real-world example

Let's say you have two large datasets of customer transactions (transactions_1 and transactions_2) sorted by transaction time.

Standard Approach (Shuffle Hash Join)

```python



transactions_1 = spark.read.csv("transactions_1.csv", header=True, inferSchema=True)

transactions_2 = spark.read.csv("transactions_2.csv", header=True, inferSchema=True)



transactions_1.join(transactions_2, "transaction_id", "inner").show()

```

Efficient Approach (Sort Merge Join)

```python

from pyspark.sql.functions import col

# Assuming 'transaction_time' is the column to sort by



transactions_1 = transactions_1.sort(col("transaction_time"))

transactions_2 = transactions_2.sort(col("transaction_time"))



transactions_1.join(transactions_2, "transaction_id", "inner").show()

```

Spark might automatically choose a sort-merge join under certain conditions, even without explicit sorting. You can sometimes hint to Spark to use a sort-merge join, but it's generally best to let the optimizer decide.

Repartitioning

Repartitioning changes the number of partitions in your DataFrame. This can be useful to:

- Increase parallelism: More partitions can lead to faster processing if you have enough executors.

- Reduce shuffle: Repartitioning on the join key before joining can avoid shuffling during the join.

- Balance data: Ensure data is evenly distributed across partitions, preventing some executors from being overloaded.

Real-world example

You have a DataFrame (customer_data) with an uneven distribution of data across partitions, causing slow joins.

Standard Approach (Unbalanced partitions)

```python

customer_data = spark.read.csv("customer_data.csv", header=True, inferSchema=True)



... perform join operations ...

```

Efficient Approach (Repartitioning)

```python

customer_data = customer_data.repartition("customer_id") # Repartition by customer ID



... perform join operations ...

# to increase the number of partitions:

customer_data = customer_data.repartition(20) # Repartition to 20 partitions

# ... perform join operations ...

```

Repartitioning involves shuffling data, so use it strategically.

Bucketing

Bucketing is a technique to pre-sort data within partitions based on a column (or columns). This can significantly speed up joins and other operations that rely on that column. Think of it like organizing your bookshelf by genre—it's much faster to find a specific book.

When to use it

- You frequently join or filter data based on a specific column.

- You want to improve the performance of operations like aggregations and lookups.

Real-world example

You have a large dataset of website logs (web_logs) and you frequently need to analyze logs by user_id.

Standard Approach (No bucketing)

```python

web_logs = spark.read.csv("web_logs.csv", header=True, inferSchema=True)



# ... perform frequent filtering or joins on 'user_id' ...


```

Efficient Approach (Bucketing)

```python

from pyspark.sql.functions import col

# Write the data bucketed by 'user_id' into 10 buckets



web_logs.write.bucketBy(10, "user_id").saveAsTable("bucketed_web_logs")

# Read the bucketed data



bucketed_web_logs = spark.table("bucketed_web_logs")

# ... perform frequent filtering or joins on 'user_id' ...

```

By bucketing the data by user_id, Spark can quickly locate relevant data within partitions, making joins and filters more efficient.

Discussion about this video

User's avatar