In PySpark, shared variables are special types of variables that can be shared across tasks in a distributed computation. These shared variables are designed to efficiently distribute data or state information to all the tasks running on different nodes in a Spark cluster. Shared variables are primarily used in scenarios where each task needs access to a common data structure or a shared state.
There are two main types of shared variables in PySpark :
Broadcast Variables :
* Broadcast variables are read-only variables that are distributed to all the nodes in a Spark cluster and cached in memory to be reused across multiple tasks.
* Broadcast variables are commonly used to efficiently distribute large read-only datasets or lookup tables to all the tasks in a Spark job, avoiding the need to transfer the data over the network multiple times.
* Broadcast variables are created using the SparkContext.broadcast()
method and can be accessed using the value
attribute.
Example :
# Broadcast a read-only lookup table to all nodes in the cluster
lookup_table = {'key1': 'value1', 'key2': 'value2'}
broadcast_var = sc.broadcast(lookup_table)
# Access the broadcast variable value in tasks
def map_function(element):
value = broadcast_var.value.get(element, None)
return value?
Accumulators :
* Accumulators are variables that are only "added" to through an associative and commutative operation and are used primarily for aggregating values across tasks in a Spark job.
* Accumulators are typically used for collecting statistics, counting occurrences, or performing other types of aggregations across the data processed by different tasks.
* Accumulators are created using the SparkContext.accumulator()
method and are updated using the add()
method within tasks.
* Accumulators are typically used in a read-only manner by tasks, and the values accumulated by different tasks are aggregated and accessible on the driver node.
Example :
# Create an accumulator to count occurrences of a certain event
accumulator_var = sc.accumulator(0)
# Increment the accumulator value within tasks
def map_function(element):
if condition:
accumulator_var.add(1)
return result?