pip install pyspark?
sc.addFile()
method for loading files on Spark. SparkFiles can also be used for getting the path using the SparkFiles.get()
method. It can also be used to resolve paths to files added using the sc.addFile()
method. class pyspark.PickleSerializer
). This supports almost every Python object.class pyspark.MarshalSerializer
. This serializer is faster than the PickleSerializer but it supports only limited types.# --serializing.py----
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "Marshal Serialization", serializer = MarshalSerializer()) #Initialize spark context and serializer
print(sc.parallelize(list(range(1000))).map(lambda x: 3 * x).take(5))
sc.stop()?
$SPARK_HOME/bin/spark-submit serializing.py?
[0, 3, 6, 9, 12]?
mllib.classification
: This supports different methods for binary or multiclass classification and regression analysis like Random Forest, Decision Tree, Naive Bayes etc.mllib.clustering
: This is used for solving clustering problems that aim in grouping entities subsets with one another depending on similarity.mllib.fpm
: FPM stands for Frequent Pattern Matching. This library is used to mine frequent items, subsequences or other structures that are used for analyzing large datasets.mllib.linalg
: This is used for solving problems on linear algebra.mllib.recommendation
: This is used for collaborative filtering and in recommender systems.spark.mllib
: This is used for supporting model-based collaborative filtering where small latent factors are identified using the Alternating Least Squares (ALS) algorithm which is used for predicting missing entries.mllib.regression
: This is used for solving problems using regression algorithms that find relationships and variable dependencies. class MarshalSerializer(FramedSerializer):
def dumps(self, obj):
return marshal.dumps(obj)
def loads(self, obj):
return marshal.loads(obj)
from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False) ?
filter()
method to apply a filter condition to the DataFrame.filter()
method takes a predicate function that evaluates to true or false for each row in the DataFrame. Rows for which the predicate function returns true are retained, while rows for which it returns false are filtered out.df.filter(df['age'] > 30)
to retain rows where the 'age
' column is greater than 30.filter()
transformation to create a new RDD containing only the elements that satisfy a given predicate function.filter()
evaluates to true or false for each element in the RDD. Elements for which the predicate function returns true are retained, while elements for which it returns false are filtered out.rdd.filter(lambda x: x % 2 == 0)
.from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder \
.appName("FilterExample") \
.getOrCreate()
# Create a DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])
# Apply a filter to retain rows where age is greater than 30
filtered_df = df.filter(df['age'] > 30)
# Show the filtered DataFrame
filtered_df.show()
# Stop SparkSession
spark.stop()?
df
is filtered to retain rows where the 'age' column is greater than 30 using the filter()
method. The resulting DataFrame filtered_df
contains only the rows that satisfy the filter condition. filter()
or map()
are run for creating new RDDs depending on the business logic.first()
, count()
etc are present then spark launches it to initiate parallel computation. class pyspark.Sparkconf(
localdefaults = True,
_jvm = None,
_jconf = None
)?
loadDefaults
: is of type boolean and indicates whether we require loading values from Java System Properties. It is True by default._jvm
: This belongs to the class py4j.java_gateway.JVMView
and is an internal parameter that is used for passing the handle to JVM. This need not be set by the users._jconf
: This belongs to the class py4j.java_gateway.JavaObject
. This parameter is an option and can be used for passing existing SparkConf handles for using the parameters. partitionBy()
by passing the columns you want to partition as an argument to this method.partitionBy(self, *cols) ?
join(self, other, on=None, how=None) ?
Join String | Equivalent SQL Join |
---|---|
inner | INNER JOIN |
outer, full, fullouter, full_outer | FULL OUTER JOIN |
left, leftouter, left_outer | LEFT JOIN |
right, rightouter, right_outer | RIGHT JOIN |
cross | |
anti, leftanti, left_anti | |
semi, leftsemi, left_semi |
createDataFrame()
method of the SparkSession.data = [('Harry', 20),
('Ron', 20),
('Hermoine', 20)]
columns = ["Name","Age"]
df = spark.createDataFrame(data=data, schema = columns)?
+-----------+----------+
| Name | Age |
+-----------+----------+
| Harry | 20 |
| Ron | 20 |
| Hermoine | 20 |
+-----------+----------+?
df.printSchema()
>> df.printSchema()
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)?
df = spark.read.csv("/path/to/file.csv")?
pyspark.sql
library has the getOrCreate()
method which creates a new SparkSession if there is none or else it returns the existing SparkSession object. The following code is an example for creating SparkSession:import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]")
.appName('InterviewBitSparkSession')
.getOrCreate()?
master()
: This is used for setting up the mode in which the application has to run - cluster mode (use the master name) or standalone mode. For Standalone mode, we use the local[x]
value to the function, where x represents partition count to be created in RDD, DataFrame and DataSet. The value of x is ideally the number of CPU cores available.appName()
: Used for setting the application namegetOrCreate()
: For returning SparkSession object. This creates a new object if it does not exist. If an object is there, it simply returns that.import pyspark
from pyspark.sql import SparkSession
spark_session = SparkSession.newSession?
get(filename)
and getrootdirectory()
is that the get(filename)
is used to achieve the correct path of the file that is added through SparkContext.addFile()
. On the other hand, the getrootdirectory()
is used to get the root directory containing the file added through SparkContext.addFile()
. startsWith()
and endsWith()
methods come in the Column class. These methods are used to search DataFrame rows by checking if the column value starts or ends with a specific value. Both are used to filter data in our application. startsWith() method
: This method is used to return a boolean value. It indicates TRUE if the value in the column starts with the specified string, and FALSE if the value in that column does not match. endsWith() method
: This method is used to return a boolean value. It indicates TRUE if the column value ends with the specified string, and FALSE if the match for that column value is not satisfied. Both methods are case sensitive. class PySpark.StorageLevel( useDisk, useMemory, useOfHeap, deserialized, replication = 1)?
createOrReplaceTempView()
function. Post creation, the table is accessible throughout SparkSession by using sql()
method. When the SparkSession gets terminated, the temporary table will be dropped.df
:+-----------+----------+----------+
| Name | Age | Gender |
+-----------+----------+----------+
| Harry | 20 | M |
| Ron | 20 | M |
| Hermoine | 20 | F |
+-----------+----------+----------+?
df.createOrReplaceTempView("STUDENTS")
df_new = spark.sql("SELECT * from STUDENTS")
df_new.printSchema()?
The schema will be displayed as shown below:>> df.printSchema()
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)
|-- Gender: string (nullable = true)?
groupByGender = spark.sql("SELECT Gender, count(*) as Gender_Count from STUDENTS group by Gender")
groupByGender.show()?
+------+------------+
|Gender|Gender_Count|
+------+------------+
| F| 1 |
| M| 2 |
+------+------------+?
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
sc = SparkContext()
ssc = StreamingContext(sc, 10)
sqlContext = SQLContext(sc)
socket_stream = ssc.socketTextStream("127.0.0.1", 5555)
lines = socket_stream.window(20)
df.printSchema()?
df.printSchema()
printsroot
|-- value: string (nullable = true)?
class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status ")): ?
class SparkStageInfo(namedtuple(“SparkStageInfo”, “stageId currentAttemptId name numTasks unumActiveTasks” “numCompletedTasks numFailedTasks” )):?
set(key, value)
: This attribute is used for setting the configuration property.setSparkHome(value)
: This attribute enables the setting Spark installation path on worker nodes.setAppName(value)
: This attribute is used for setting the application name.setMaster(value)
: This attribute is used to set the master URL.get(key, defaultValue=None)
: This attribute supports getting a configuration value of a key. SparkContext.broadcast()
method and can be accessed using the value
attribute.# Broadcast a read-only lookup table to all nodes in the cluster
lookup_table = {'key1': 'value1', 'key2': 'value2'}
broadcast_var = sc.broadcast(lookup_table)
# Access the broadcast variable value in tasks
def map_function(element):
value = broadcast_var.value.get(element, None)
return value?
Accumulators :SparkContext.accumulator()
method and are updated using the add()
method within tasks.# Create an accumulator to count occurrences of a certain event
accumulator_var = sc.accumulator(0)
# Increment the accumulator value within tasks
def map_function(element):
if condition:
accumulator_var.add(1)
return result?
join()
method that is present in PySpark SQL. The syntax of the method looks like: join(self, other, on = None, how = None) ?
where()
and filter()
methods can be attached to the join expression to filter rows. We can also have multiple joins using the chaining join() method. emp_dept_df = empDF.join(deptDF,empDF.empdept_id==deptDF.dept_id,"inner").show(truncate = False) ?
pipe()
function on RDDs, which allows us to assemble different parts of jobs that can use any language. pipe()
function and can be used to read each RDD element as a string. These can be changed as needed and the results can be presented as strings.