Google News
logo
PySpark Interview Questions
PySpark is the Python API for Apache Spark, a powerful open-source distributed computing system designed for big data processing and analytics. Apache Spark provides a unified engine for large-scale data processing, with support for various programming languages such as Python, Java, Scala, and R.

PySpark allows developers to write Spark applications using Python programming language. It provides a high-level API that simplifies the process of building parallel applications and performing distributed data processing tasks, such as data transformations, aggregations, machine learning, and streaming analytics.

PySpark can be installed using PyPi by using the command :
pip install pyspark?
PySpark, the Python API for Apache Spark, boasts several key features that make it a popular choice for big data processing and analytics. Here are some of its key features:

Ease of Use : PySpark provides an intuitive and easy-to-use API for Python developers, allowing them to leverage the power of Apache Spark without having to learn a new programming language or complex distributed computing concepts. Python developers can write Spark applications using familiar Python syntax, making development faster and more straightforward.

Integration with Python Ecosystem : PySpark seamlessly integrates with the rich ecosystem of Python libraries and tools, including popular data science libraries such as Pandas, NumPy, and scikit-learn. This integration enables users to perform complex data manipulation, analysis, and machine learning tasks using familiar Python libraries within Spark applications.

Distributed Data Processing : PySpark enables distributed processing of large datasets across a cluster of machines, leveraging the scalability and fault tolerance features of Apache Spark. It allows users to parallelize data processing tasks, making it possible to analyze massive datasets efficiently and quickly.

Resilient Distributed Datasets (RDDs) : PySpark introduces the concept of Resilient Distributed Datasets (RDDs), which are fault-tolerant, immutable collections of data distributed across a cluster. RDDs allow users to perform parallel operations on large datasets, with built-in fault tolerance and automatic recovery in case of node failures.

Lazy Evaluation : PySpark uses lazy evaluation, meaning that transformations on RDDs are not executed immediately but are instead queued up and executed only when an action is called. This optimization technique improves performance by allowing Spark to optimize the execution plan and minimize unnecessary computations.

Support for Various Data Formats : PySpark supports a wide range of data formats, including structured data (e.g., CSV, JSON, Parquet), semi-structured data (e.g., XML), and unstructured data (e.g., text files). It provides built-in APIs for reading, writing, and manipulating data in these formats, making it easy to work with diverse data sources.

Rich Set of Libraries : PySpark comes with a rich set of libraries for various data processing tasks, including Spark SQL for structured data processing, Spark MLlib for machine learning, Spark Streaming for real-time data processing, and Spark GraphX for graph processing. These libraries provide high-level APIs for common data processing tasks, enabling users to build sophisticated analytics applications with ease.
There are 4 characteristics of PySpark :

* Abstracted Nodes : This means that the individual worker nodes can not be addressed.

* Spark API : PySpark provides APIs for utilizing Spark features.

* Map-Reduce Model : PySpark is based on Hadoop’s Map-Reduce model this means that the programmer provides the map and the reduce functions.

* Abstracted Network : Networks are abstracted in PySpark which means that the only possible communication is implicit communication.
Disadvantages of PySpark :

Performance Overhead : While PySpark offers ease of use and integration with Python, it may incur a performance overhead compared to writing Spark applications in Scala, the native language of Apache Spark. This overhead is primarily due to the dynamic nature of Python and the additional serialization/deserialization required when exchanging data between Python and Java Virtual Machine (JVM) processes.

Limited Type Safety : Python's dynamic typing can lead to potential runtime errors that may not be caught until runtime, unlike statically typed languages like Scala. This lack of type safety can make it more challenging to debug and maintain PySpark applications, especially for larger codebases.

Limited Development Tools : While PySpark benefits from Python's extensive ecosystem of libraries and tools, it may lack some of the development tools and IDE support available for other languages like Scala or Java. However, this gap is gradually narrowing as the PySpark community continues to grow and develop more robust tools and integrations.

Dependency Management : Managing dependencies and ensuring compatibility between different Python libraries and Spark versions can be challenging, especially in larger projects with complex dependency graphs. Users may encounter issues with version conflicts or incompatible libraries, requiring careful management and testing of dependencies.
RDD :
* It is Spark's structural square. RDDs contain all datasets and dataframes.
* If a similar arrangement of data needs to be calculated again, RDDs can be efficiently reserved.
* It's useful when you need to do low-level transformations, operations, and control on a dataset.
* It's more commonly used to alter data with functional programming structures than with domain-specific expressions.

DataFrame :
* It allows the structure, i.e., lines and segments, to be seen. You can think of it as a database table.
* Optimized Execution Plan- The catalyst analyzer is used to create query plans.
* One of the limitations of dataframes is Compile Time Wellbeing, i.e., when the structure of information is unknown, no control of information is possible.
* Also, if you're working on Python, start with DataFrames and then switch to RDDs if you need more flexibility.

DataSet (A subset of DataFrames) :
* It has the best encoding component and, unlike information edges, it enables time security in an organized manner.
* If you want a greater level of type safety at compile-time, or if you want typed JVM objects, Dataset is the way to go.
* Also, you can leverage datasets in situations where you are looking for a chance to take advantage of Catalyst optimization or even when you are trying to benefit from Tungsten’s fast code generation.
In PySpark, RDD is an acronym that stands for Resilient Distributed Datasets. It is a core data structure of PySpark. It is a low-level object that is highly efficient in performing distributed tasks.

The PySpark's RDDs are the elements that can run and operate on multiple nodes to do parallel processing on a cluster. These are immutable elements. It means that if you once create an RDD, you cannot change it. RDDs are also fault-tolerant. In the case of any failure, they recover automatically. We can apply multiple operations on RDDs to achieve a certain task.
PySpark SparkContext is an initial entry point of the spark functionality. It also represents Spark Cluster Connection and can be used for creating the Spark RDDs (Resilient Distributed Datasets) and broadcasting the variables on the cluster.

The following diagram represents the architectural diagram of PySpark’s SparkContext:

Spark

When we want to run the Spark application, a driver program that has the main function will be started. From this point, the SparkContext that we defined gets initiated. Later on, the driver program performs operations inside the executors of the worker nodes. Additionally, JVM will be launched using Py4J which in turn creates JavaSparkContext. Since PySpark has default SparkContext available as “sc”, there will not be a creation of a new SparkContext.
PySpark is easy to learn and implement. It doesn't require the expertise of many programming languages or databases. You can learn it easily if you know a programming language and framework. Before learning the concept of PySpark, you should learn some knowledge of Apache Spark and Python. It will be very helpful to learn the advanced concepts of PySpark.
In PySpark, partitions are immutable primarily for reasons related to fault tolerance and consistency in distributed computing. Here's why partitions are kept immutable:

* Fault Tolerance : By making partitions immutable, PySpark ensures fault tolerance in distributed data processing. Since partitions are immutable, once created, their contents cannot be modified. This immutability simplifies fault recovery mechanisms because if a node fails during computation, PySpark can easily reconstruct the lost partitions from the original immutable data source or lineage, rather than trying to recover mutable data that may have been corrupted due to the failure.

* Consistency : Immutability ensures consistency in distributed data processing. When multiple tasks are executed in parallel across different partitions, each task operates on its partition of data independently without worrying about other tasks modifying the same data concurrently. This isolation prevents data inconsistency issues that could arise if partitions were mutable and multiple tasks attempted to modify the same partition simultaneously.

* Simplicity and Predictability : Immutability simplifies the programming model and makes distributed data processing more predictable. Developers can reason about the state of data in partitions more easily because they don't have to consider concurrent modifications by other tasks. This simplicity leads to more reliable and maintainable PySpark applications.

* Optimizations : Immutable partitions enable various optimizations in PySpark, such as pipelining transformations and caching intermediate results. Since partitions are immutable, PySpark can optimize transformations by chaining them together and executing them in a lazy manner, without actually materializing intermediate results until necessary. This optimization improves performance by reducing unnecessary data shuffling and materialization.
MLlib can be used to implement machine learning in Spark. Spark provides a scalable machine learning dataset called MLlib. It is primarily used to make machine learning scalable and lightweight, with common learning algorithms and use cases such as clustering, decay filtering, and dimensionality reduction. This is how machine learning can be implemented in Spark.
There are some basic differences between PySpark and other programming languages. PySpark has its built-in APIs, but other programming languages require APIs to be integrated externally from third parties. Next difference is, implicit communications can be done in PySpark, but it is not possible in other programming languages. PySpark is map based, so developers can use maps to reduce functions. PySpark allows for multiple nodes, again which is not possible in other programming languages.
PySpark’s SparkFiles are used for loading the files onto the Spark application. This functionality is present under SparkContext and can be called using the sc.addFile() method for loading files on Spark. SparkFiles can also be used for getting the path using the SparkFiles.get() method. It can also be used to resolve paths to files added using the sc.addFile() method.
The serialization process is used to conduct performance tuning on Spark. The data sent or received over the network to the disk or memory should be persisted. PySpark supports serializers for this purpose. It supports two types of serializers, they are:

PickleSerializer : This serializes objects using Python’s PickleSerializer (class pyspark.PickleSerializer). This supports almost every Python object.
MarshalSerializer : This performs serialization of objects. We can use it by using class pyspark.MarshalSerializer. This serializer is faster than the PickleSerializer but it supports only limited types.

Consider an example of serialization which makes use of MarshalSerializer:
# --serializing.py----
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "Marshal Serialization", serializer = MarshalSerializer())    #Initialize spark context and serializer
print(sc.parallelize(list(range(1000))).map(lambda x: 3 * x).take(5))
sc.stop()?

When we run the file using the command :
$SPARK_HOME/bin/spark-submit serializing.py?

The output of the code would be the list of size 5 of numbers multiplied by 3:
[0, 3, 6, 9, 12]?
Similar to Spark, PySpark provides a machine learning API which is known as MLlib that supports various ML algorithms like:

* mllib.classification : This supports different methods for binary or multiclass classification and regression analysis like Random Forest, Decision Tree, Naive Bayes etc.

* mllib.clustering : This is used for solving clustering problems that aim in grouping entities subsets with one another depending on similarity.

* mllib.fpm : FPM stands for Frequent Pattern Matching. This library is used to mine frequent items, subsequences or other structures that are used for analyzing large datasets.

* mllib.linalg : This is used for solving problems on linear algebra.

* mllib.recommendation : This is used for collaborative filtering and in recommender systems.

* spark.mllib : This is used for supporting model-based collaborative filtering where small latent factors are identified using the Alternating Least Squares (ALS) algorithm which is used for predicting missing entries.

* mllib.regression : This is used for solving problems using regression algorithms that find relationships and variable dependencies.
With the help of Python’s Marshal Serializer, it serializes objects. Even if it supports fewer datatypes, it is faster than PickleSerializer.
class MarshalSerializer(FramedSerializer):
   def dumps(self, obj):
       return marshal.dumps(obj)
    def loads(self, obj):
       return marshal.loads(obj)
class MarshalSerializer(FramedSerializer):
   def dumps(self, obj):
       return marshal.dumps(obj)
    def loads(self, obj):
       return marshal.loads(obj)
PySpark uses a paradigm in which one element controls another, just like Apache Spark. Here, the controlling node is the driver, and the others are worker nodes. The Spark Driver builds a SparkContext during the execution of the application, which serves as the entry point. The worker nodes handle all the operations. Cluster managers administer the resources necessary for the operations that affect the worker nodes.
SparkCore is a general execution engine that supports all the functions of the Spark platform. It includes Java, Scala and Python APIs that simplify development, in-memory processing capabilities to deliver a better speed, and a generalised execution paradigm to accommodate a variety of applications. The primary functions of SparkCore include all fundamental input and output (I/O) operations, storage scheduling and monitoring. Additionally, it is in charge of efficient memory management and fault recovery.
Real-time media streaming, financial analysis, e-commerce recommendations and telecommunication services are just a few of PySpark's industrial applications. For example, healthcare providers can use Spark to analyse the patient's prior medical records to determine the health difficulties patients may experience after discharge. They might also utilise Spark to undertake genome sequencing to speed up the processing of genome data. Travel companies may employ Spark to compare information and reviews from different websites about the location, hotels and other travel-related topics to assist customers plan the ideal vacation and offer personalised recommendations to travellers.
In PySpark, SparkSession is the entry point to the application. In the first version of PySpark, SparkContext was used as the entry point. SparkSession is the replacement of SparkContext since PySpark version 2.0. After the PySpark version 2.0, SparkSession acts as a starting point to access all of the PySpark functionalities related to RDDs, DataFrame, Datasets, etc. It is also a Unified API used to replace the SQLContext, StreamingContext, HiveContext, and all other contexts in Pyspark.

The SparkSession internally creates SparkContext and SparkConfig according to the details provided in SparkSession. You can create SparkSession by using builder patterns.
PySpark ArrayType is a collection data type that extends the PySpark's DataType class, which is the superclass for all kinds. The PySpark ArrayType contains only the same types of items. The ArraType() method can also be used to construct an instance of an ArrayType.

It accepts two arguments:

valueType : The valueType should extend the DataType class in PySpark.
valueContainsNull : It is an optional argument. It specifies whether a value can accept null and is set to True by default.

Example :
from pyspark.sql.types import StringType, ArrayType  
arrayCol = ArrayType(StringType(),False)  ?
PySpark RDDs have the following advantages :

In-Memory Processing : PySpark’s RDD helps in loading data from the disk to the memory. The RDDs can even be persisted in the memory for reusing the computations.
 
Immutability : The RDDs are immutable which means that once created, they cannot be modified. While applying any transformation operations on the RDDs, a new RDD would be created.

Fault Tolerance : The RDDs are fault-tolerant. This means that whenever an operation fails, the data gets automatically reloaded from other available partitions. This results in seamless execution of the PySpark applications.

Lazy Evolution : The PySpark transformation operations are not performed as soon as they are encountered. The operations would be stored in the DAG and are evaluated once it finds the first RDD action.
Partitioning: Whenever RDD is created from any data, the elements in the RDD are partitioned to the cores available by default.
UDF stands for User Defined Functions. In PySpark, UDF can be created by creating a python function and wrapping it with PySpark SQL’s udf() method and using it on the DataFrame or SQL. These are generally created when we do not have the functionalities supported in PySpark’s library and we have to use our own logic on the data. UDFs can be reused on any number of SQL expressions or DataFrames.
23 .
Is PySpark faster than pandas?
PySpark supports parallel execution of statements in a distributed environment, i.e on different cores and different machines which are not present in Pandas. This is why PySpark is faster than pandas.
In PySpark, filters are used to select a subset of data from a DataFrame or RDD based on specified criteria. Filters allow you to conditionally include or exclude rows of data based on values in one or more columns. Filters are commonly used in data processing pipelines to perform data cleansing, data wrangling, and data analysis tasks.

Here's how filters work in PySpark :

Filtering DataFrames :
* When working with DataFrames in PySpark, you can use the filter() method to apply a filter condition to the DataFrame.
* The filter() method takes a predicate function that evaluates to true or false for each row in the DataFrame. Rows for which the predicate function returns true are retained, while rows for which it returns false are filtered out.
* The predicate function typically involves comparisons or logical operations on column values. For example, you can filter rows where a specific column meets a certain condition, such as df.filter(df['age'] > 30) to retain rows where the 'age' column is greater than 30.

Filtering RDDs :
* When working with Resilient Distributed Datasets (RDDs) in PySpark, you can use the filter() transformation to create a new RDD containing only the elements that satisfy a given predicate function.
* Similar to DataFrames, the predicate function used with filter() evaluates to true or false for each element in the RDD. Elements for which the predicate function returns true are retained, while elements for which it returns false are filtered out.
* For example, you can filter an RDD of integers to retain only even numbers using rdd.filter(lambda x: x % 2 == 0).

Here's a simple example demonstrating how to use filters in PySpark with DataFrames:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("FilterExample") \
    .getOrCreate()

# Create a DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])

# Apply a filter to retain rows where age is greater than 30
filtered_df = df.filter(df['age'] > 30)

# Show the filtered DataFrame
filtered_df.show()

# Stop SparkSession
spark.stop()?

In this example, the DataFrame df is filtered to retain rows where the 'age' column is greater than 30 using the filter() method. The resulting DataFrame filtered_df contains only the rows that satisfy the filter condition.
DAG stands for Direct Acyclic Graph. DAGScheduler constitutes the scheduling layer of Spark which implements scheduling of tasks in a stage-oriented manner using jobs and stages. The logical execution plan (Dependencies lineage of transformation actions upon RDDs) is transformed into a physical execution plan consisting of stages. It computes a DAG of stages needed for each job and keeps track of what stages are RDDs are materialized and finds a minimal schedule for running the jobs. These stages are then submitted to TaskScheduler for running the stages.

DAGScheduler performs the following three things in Spark :

* Compute DAG execution for the job.
* Determine preferred locations for running each task
* Failure Handling due to output files lost during shuffling.

PySpark’s DAGScheduler follows event-queue architecture. Here a thread posts events of type DAGSchedulerEvent such as new stage or job. The DAGScheduler then reads the stages and sequentially executes them in topological order.
The most common workflow followed by the spark program is:

* The first step is to create input RDDs depending on the external data. Data can be obtained from different data sources.
* Post RDD creation, the RDD transformation operations like filter() or map() are run for creating new RDDs depending on the business logic.
* If any intermediate RDDs are required to be reused for later purposes, we can persist those RDDs.
* Lastly, if any action operations like first(), count() etc are present then spark launches it to initiate parallel computation.
PySpark SparkConf is used for setting the configurations and parameters required to run applications on a cluster or local system. The following class can be executed to run the SparkConf:
class pyspark.Sparkconf(
localdefaults = True,
_jvm = None,
_jconf = None
)?

where :

* loadDefaults : is of type boolean and indicates whether we require loading values from Java System Properties. It is True by default.
* _jvm : This belongs to the class py4j.java_gateway.JVMView and is an internal parameter that is used for passing the handle to JVM. This need not be set by the users.
* _jconf : This belongs to the class py4j.java_gateway.JavaObject. This parameter is an option and can be used for passing existing SparkConf handles for using the parameters.
PySpark Partition is a method of splitting a large dataset into smaller datasets based on one or more partition keys. It enhances the execution speed as transformations on partitioned data run quicker because each partition's transformations are executed in parallel. PySpark supports both partitioning in memory (DataFrame) and partitioning on disc (File system). When we make a DataFrame from a file or table, PySpark creates the DataFrame in memory with a specific number of divisions based on specified criteria.

It also facilitates us to create a partition on multiple columns using partitionBy() by passing the columns you want to partition as an argument to this method.

Syntax :
partitionBy(self, *cols)  ?

In PySpark, it is recommended to have 4x of partitions to the number of cores in the cluster available for application.
In PySpark, joins merge or join two DataFrames together. It facilitates us to link two or multiple DataFrames together.

INNER Join, LEFT OUTER Join, RIGHT OUTER Join, LEFT ANTI Join, LEFT SEMI Join, CROSS Join, and SELF Join are among the SQL join types PySpark supports. Following is the syntax of PySpark Join.

Syntax :
join(self, other, on=None, how=None)  ?

Parameter Explanation :

The join() procedure accepts the following parameters and returns a DataFrame:

* "other" : It specifies the join's right side.
* "on" : It specifies the join column's name.
* "how" : It is used to specify an option. Options are inner, cross, outer, full, full outer, left, left outer, right, right outer, left semi, and left anti. The default is inner.

Types of Join in PySpark DataFrame
Join String Equivalent SQL Join
inner INNER JOIN
outer, full, fullouter, full_outer FULL OUTER JOIN
left, leftouter, left_outer LEFT JOIN
right, rightouter, right_outer RIGHT JOIN
cross  
anti, leftanti, left_anti  
semi, leftsemi, left_semi  
In PySpark, the Parquet file is a column-type format supported by several data processing systems. By using the Parquet file, Spark SQL can perform both read and write operations.

The Parquet file contains a column type format storage which provides the following advantages:

* It is small and consumes less space.
* It facilitates us to fetch specific columns for access.
* It follows type-specific encoding.
* It offers better-summarized data.
* It contains very limited I/O operations.
We can do it by making use of the createDataFrame() method of the SparkSession.
data = [('Harry', 20),
       ('Ron', 20),
       ('Hermoine', 20)]
columns = ["Name","Age"]
df = spark.createDataFrame(data=data, schema = columns)?

This creates the dataframe as shown below:
+-----------+----------+
| Name      | Age      |
+-----------+----------+
| Harry     | 20       |
| Ron       | 20       |
| Hermoine  | 20       |
+-----------+----------+?

We can get the schema of the dataframe by using df.printSchema()
>> df.printSchema()
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)?
Yes, it is! Realtime applications make use of external file systems like local, HDFS, HBase, MySQL table, S3 Azure etc. Following example shows how we can create DataFrame by reading data from a csv file present in the local system:
df = spark.read.csv("/path/to/file.csv")?

PySpark supports csv, text, avro, parquet, tsv and many other file extensions.
In PySpark, a cluster manager is a cluster mode platform that facilitates Spark to run by providing all resources to worker nodes according to their requirements.

A Spark cluster manager ecosystem contains a master node and multiple worker nodes. The master nodes provide the worker nodes with the resources like memory, processor allocation, etc., according to the nodes' requirements with the help of the cluster manager.

PySpark supports the following cluster manager types :

* Standalone : This is a simple cluster manager that comes with Spark.
* Apache Mesos : This cluster manager is used to run Hadoop MapReduce and PySpark apps.
* Hadoop YARN : This cluster manager is used in Hadoop2.
* Kubernetes : This cluster manager is an open-source cluster manager that helps automate deployment, scaling, and automatic management of containerized apps.
* local : This cluster manager is a mode for running Spark applications on laptops/desktops.
To create SparkSession, we use the builder pattern. The SparkSession class from the pyspark.sql library has the getOrCreate() method which creates a new SparkSession if there is none or else it returns the existing SparkSession object. The following code is an example for creating SparkSession:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]")
                   .appName('InterviewBitSparkSession')
                   .getOrCreate()?

Here,

* master() : This is used for setting up the mode in which the application has to run - cluster mode (use the master name) or standalone mode. For Standalone mode, we use the local[x] value to the function, where x represents partition count to be created in RDD, DataFrame and DataSet. The value of x is ideally the number of CPU cores available.

* appName() : Used for setting the application name

* getOrCreate() : For returning SparkSession object. This creates a new object if it does not exist. If an object is there, it simply returns that.


If we want to create a new SparkSession object every time, we can use the newSession method as shown below:
import pyspark
from pyspark.sql import SparkSession
spark_session = SparkSession.newSession?
The main difference between get(filename) and getrootdirectory() is that the get(filename) is used to achieve the correct path of the file that is added through SparkContext.addFile(). On the other hand, the getrootdirectory() is used to get the root directory containing the file added through SparkContext.addFile().
In PySpark, startsWith() and endsWith() methods come in the Column class. These methods are used to search DataFrame rows by checking if the column value starts or ends with a specific value. Both are used to filter data in our application.

* startsWith() method : This method is used to return a boolean value. It indicates TRUE if the value in the column starts with the specified string, and FALSE if the value in that column does not match.  

* endsWith() method : This method is used to return a boolean value. It indicates TRUE if the column value ends with the specified string, and FALSE if the match for that column value is not satisfied. Both methods are case sensitive. 
PySpark StorageLevel is used to control RDD storage. It can control how and where the RDD is stored. The PySpark StorageLevel decides whether the RDD is stored in memory, on disk, or both. It also determines if we need to replicate the RDD partitions or serialize the RDD. The code for PySpark StorageLevel looks like:

class PySpark.StorageLevel( useDisk, useMemory, useOfHeap, deserialized, replication = 1)?
PySpark supports custom profilers. The custom profilers are used for building predictive models. Profilers are also used for data review to ensure that it is valid, and we can use it in consumption. When we require a custom profiler, it has to define some of the following methods:

* stats : This is used to return collected stats of profiling.
* profile : This is used to produce a system profile of some sort.
* dump : This is used to dump the profiles to a specified path.
* dump(id, path) : This is used to dump a specific RDD id to the path given.
* add : This is used for adding profile to existing accumulated profile. The profile class has to be selected at the time of SparkContext creation.
PySpark SQL is the most popular PySpark module that is used to process structured columnar data. Once a DataFrame is created, we can interact with data using the SQL syntax. Spark SQL is used for bringing native raw SQL queries on Spark by using select, where, group by, join, union etc. For using PySpark SQL, the first step is to create a temporary table on DataFrame by using createOrReplaceTempView() function. Post creation, the table is accessible throughout SparkSession by using sql() method. When the SparkSession gets terminated, the temporary table will be dropped.

For example, consider we have the following DataFrame assigned to a variable df :
+-----------+----------+----------+
| Name      | Age      | Gender   |
+-----------+----------+----------+
| Harry     | 20       |    M     |
| Ron       | 20       |    M     |
| Hermoine  | 20       |    F     |
+-----------+----------+----------+?

In the below piece of code, we will be creating a temporary table of the DataFrame that gets accessible in the SparkSession using the sql() method. The SQL queries can be run within the method.
df.createOrReplaceTempView("STUDENTS")
df_new = spark.sql("SELECT * from STUDENTS")
df_new.printSchema()?
The schema will be displayed as shown below:
>> df.printSchema()
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)
|-- Gender: string (nullable = true)?

For the above example, let’s try running group by on the Gender column:
groupByGender = spark.sql("SELECT Gender, count(*) as Gender_Count from STUDENTS group by Gender")
groupByGender.show()?

The above statements results in :
+------+------------+
|Gender|Gender_Count|
+------+------------+
|     F|       1    |
|     M|       2    |
+------+------------+?
PySpark Streaming is scalable, fault-tolerant, high throughput based processing streaming system that supports streaming as well as batch loads for supporting real-time data from data sources like TCP Socket, S3, Kafka, Twitter, file system folders etc. The processed data can be sent to live dashboards, Kafka, databases, HDFS etc.

To perform Streaming from the TCP socket, we can use the readStream.format("socket") method of Spark session object for reading data from TCP socket and providing the streaming source host and port as options as shown in the code below:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
sc = SparkContext()
ssc = StreamingContext(sc, 10)
sqlContext = SQLContext(sc)
socket_stream = ssc.socketTextStream("127.0.0.1", 5555)
lines = socket_stream.window(20)
df.printSchema()?

Spark loads the data from the socket and represents it in the value column of the DataFrame object. The df.printSchema() prints
root
|-- value: string (nullable = true)?

Post data processing, the DataFrame can be streamed to the console or any other destinations based on the requirements like Kafka, dashboards, database etc.
The PySpark SparkJobinfo is used to get information about the SparkJobs that are in execution.

Following is the code for using the SparkJobInfo :

class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status ")): ?
Hive is used in HQL (Hive Query Language), and Spark SQL is used in Structured Query Language to process and query data. We can easily connect SQL table and HQL table to Spark SQL. Flash SQL is used as a unique segment on the Spark Core engine that supports SQL and Hive Query Language without changing the sentence structure.
43 .
How does PySpark's Catalyst Optimizer improve query performance?
PySpark's Catalyst Optimizer improves query performance by providing an advanced query optimization framework. PySpark's Catalyst Optimizer transforms SQL queries into an optimized logical and physical execution plan, ensuring efficient processing of large-scale data.
44 .
 Are PySpark and Spark the same?
PySpark has been launched to support the collaboration of Python and Apache Spark. Essentially, it is a Python API for Spark. PySpark assists you in interfacing with Resilient Distributed Datasets (RDDs) in Python programming language and Apache Spark.
One of the most common question in any PySpark interview question and answers guide. PySpark SparkStageInfo is used to gain information about the SparkStages that are present at that time. The code used fo SparkStageInfo is as follows:

class SparkStageInfo(namedtuple(“SparkStageInfo”, “stageId currentAttemptId name numTasks unumActiveTasks” “numCompletedTasks numFailedTasks” )):?
46 .
What do you understand by RDD Lineage?
The RDD lineage is a procedure that is used to reconstruct the lost data partitions. The Spark does not hold up data replication in the memory. If any data is lost, we have to rebuild it using RDD lineage. This is the best use case as RDD always remembers how to construct from other datasets.
The Lineage Graph is a collection of RDD dependencies. There are separate lineage graphs for every Spark application. The lineage graph recompiles RDDs on-demand and restores misplaced data from persisted RDDs. An RDD lineage graph lets us assemble a new RDD or restore data from a lost persisted RDD. It was created by using changes to the RDD and generating a regular execution plan.
Following is the list of main attributes used in SparkConf :

* set(key, value) : This attribute is used for setting the configuration property.
* setSparkHome(value) : This attribute enables the setting Spark installation path on worker nodes.
* setAppName(value) : This attribute is used for setting the application name.
* setMaster(value) : This attribute is used to set the master URL.
* get(key, defaultValue=None) : This attribute supports getting a configuration value of a key.
In PySpark, shared variables are special types of variables that can be shared across tasks in a distributed computation. These shared variables are designed to efficiently distribute data or state information to all the tasks running on different nodes in a Spark cluster. Shared variables are primarily used in scenarios where each task needs access to a common data structure or a shared state.

There are two main types of shared variables in PySpark :

Broadcast Variables :
* Broadcast variables are read-only variables that are distributed to all the nodes in a Spark cluster and cached in memory to be reused across multiple tasks.
* Broadcast variables are commonly used to efficiently distribute large read-only datasets or lookup tables to all the tasks in a Spark job, avoiding the need to transfer the data over the network multiple times.
* Broadcast variables are created using the SparkContext.broadcast() method and can be accessed using the value attribute.

Example :
# Broadcast a read-only lookup table to all nodes in the cluster
lookup_table = {'key1': 'value1', 'key2': 'value2'}
broadcast_var = sc.broadcast(lookup_table)

# Access the broadcast variable value in tasks
def map_function(element):
    value = broadcast_var.value.get(element, None)
    return value?
Accumulators :
* Accumulators are variables that are only "added" to through an associative and commutative operation and are used primarily for aggregating values across tasks in a Spark job.
* Accumulators are typically used for collecting statistics, counting occurrences, or performing other types of aggregations across the data processed by different tasks.
* Accumulators are created using the SparkContext.accumulator() method and are updated using the add() method within tasks.
* Accumulators are typically used in a read-only manner by tasks, and the values accumulated by different tasks are aggregated and accessible on the driver node.

Example :
# Create an accumulator to count occurrences of a certain event
accumulator_var = sc.accumulator(0)

# Increment the accumulator value within tasks
def map_function(element):
    if condition:
        accumulator_var.add(1)
    return result?
Catalyst Optimizer plays a very important role in Apache Spark. It helps to improve structural queries in SQL or expressed through DataFrame or DataSet APIs by reducing the program execution time and cost. The Spark Catalyst Optimizer supports both cost-based and rule-based optimization. Rule-based optimization contains a set of rules that define how a query is executed. Cost-based optimization uses rules to create multiple plans and calculate their costs. Catalyst optimizer also manages various big data challenges like semi-structured data and advanced analytics.
We can use join() method that is present in PySpark SQL. The syntax of the method looks like:
join(self, other, on = None, how = None) ?

Where,

* other : It is the right side of the join.
* on : It is the column name string used for joining.  
* how : It is the type of join. Default type is inner. The values of the type can be inner, left, right, cross, full, outer, left_outer, right_outer, left_anti and left_semi.  

where() and filter() methods can be attached to the join expression to filter rows. We can also have multiple joins using the chaining join() method.  

For example, consider we have two dataframes named Employee and Department. Both have columns named as emp_id, emp_name, empdept_id and dept_id, dept_name respectively. We can internally join the Employee DataFrame with the Department DataFrame to get the department information along with the employee information. The code will look like:
emp_dept_df = empDF.join(deptDF,empDF.empdept_id==deptDF.dept_id,"inner").show(truncate = False) ?
The streaming application must be available 24/7 and tolerant of errors outside the application code (e.g., system crashes, JVM crashes, etc.). The checkpointing process makes streaming applications more fault tolerant. We can store data and metadata in the checkpoint directory.

Checkpoint can be of two types – metadata check and data check.

A metadata checkpoint allows you to store the information that defines a streaming computation in a fault-tolerant storage system such as HDFS. This helps to recover data after a streaming application controller node failure.

Data checkpointing means saving the created RDDs to a safe place. This type of checkpoint requires several state calculations that combine data from different batches.
Property Operators : These operators create a new graph with a user-defined map function modifying the characteristics of a vertex or edge. For these operators, the graph structure is unchanged. This is an important property of these operators because it allows the generated graph to preserve the structural indices of the original graph.

Structural Operators : GraphX currently supports only a few widely used structural operators. The opposite operator creates a new graph with the directions of the edges reversed. The subgraph operator returns a graph with only vertices and edges that meet the vertex predicate. The mask operator creates a subgraph by returning a graph with all vertices and edges found in the input graph. The groupEdges operator merges parallel edges.

Join operators : Join operators allow you to join data from external collections (RDDs) to existing graphs. For example, we may want to combine new custom attributes with an existing graph or drag vertex properties from one graph to another.
According to the UNIX Standard Streams, Apache Spark supports the pipe() function on RDDs, which allows us to assemble different parts of jobs that can use any language.

An RDD transformation can be created using the pipe() function and can be used to read each RDD element as a string. These can be changed as needed and the results can be presented as strings.