Apache Spark Reference
2023-05-03
[[TOC]]
Inspired by:β
- Act as a Tech Writer
- https://api-docs.databricks.com/scala/spark/latest/org/apache/spark/sql/Dataset.html
Prompt:β
Act as a tech writer and Apache Spark expert. You will act as a creative and engaging technical writer and create guides on how to do use a specific function in Scala Spark. I will provide you with the function name, and you will do the best to provide a short description, usage example, and output. The usage example should be good to explain what the function do and clear to the reader (simple to understand). Also the description should be concise. Clear, simple and concise. Scala codding requirements:
- Prefer writhing variable datatypes;
- Use more simple Scala syntax for the scala beginners. Keep it clear for someone who knows Java, C#, JavaScript, but try to keep good scala programming style.;
- Instead of expressions in the string literals, prefer using language constructs;
- Try to avoid using tuples, like "._1", "._2". Don't use "employee => employee._2 >= 30"; rather introduce a case class;
- Nicely format code with indentation; Format Array initialization, object initialization and the output to illustrate the hierarchical structure of the data.
- Start each reply with the level-4 markdown header. Inside the header, include function name, parameters and return type inside markdown back-ticks:
except(other: Dataset[T]): Dataset[T]
- Maintain line length 120 characters. This is Okay to have longer lines, but rather prefer splitting statement in multiple lines with nice indentation. Please explain: Spark 3 Dataset:
- except(other: Dataset[T]): Dataset[T]
Actionsβ
collect()β
Description:
The collect() function in Spark 3 Dataset is used to retrieve all the elements of a dataset and return them as an array or a list in the driver program. It returns an array of objects that can be used to perform operations on the data or iterate over the elements.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to retrieve the information of all employees whose age is greater than or equal to 30. We can use the collect() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
val filteredEmployees = employeeDataset.filter(_.age >= 30).collect()
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. We apply the filter() function to select only the employees whose age is greater than or equal to 30. Finally, we use the collect() function to retrieve all the filtered employees.
Output:
The output of the collect() function is an array of objects. In our example, the filteredEmployees variable will contain an array of Employee instances representing the information of the filtered employees:
Array(
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
Note that using the collect() function to retrieve all the elements of a large dataset can be computationally expensive and memory-intensive, and may lead to performance issues or even out-of-memory errors. Therefore, it is recommended to use this function with caution and only for small datasets or for testing purposes.
collectAsList()β
Description:
The collectAsList() function in Spark 3 Dataset is similar to the collect() function but it returns the result as a Java List instead of an array. It collects all the elements of a dataset to the driver program and returns them as a Java List that can be used for further processing.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to retrieve the information of all employees whose age is greater than or equal to 30. We can use the collectAsList() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
val filteredEmployees = employeeDataset.filter(_.age >= 30).collectAsList()
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. We apply the filter() function to select only the employees whose age is greater than or equal to 30. Finally, we use the collectAsList() function to retrieve all the filtered employees as a Java List.
Output:
The output of the collectAsList() function is a Java List of objects. In our example, the filteredEmployees variable will contain a Java List of Employee instances representing the information of the filtered employees:
List(
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
Note that the collectAsList() function has the same performance and memory implications as the collect() function, and should be used with caution for large datasets.
count()β
Description:
The count() function in Spark 3 Dataset is used to count the number of elements in a dataset. It returns the total number of rows or records present in the dataset.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to count the number of employees in the dataset. We can use the count() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
val employeeCount = employeeDataset.count()
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the count() function to count the total number of employees in the dataset.
Output:
The output of the count() function is a Long value representing the total number of elements in the dataset. In our example, the employeeCount variable will contain the total number of employees in the dataset, which is 4:
4
Note that the count() function is an action operation that triggers the computation of the dataset, and should be used with caution for large datasets.
describe()β
Description:
The describe() function in Spark 3 Dataset is used to summarize the statistics of a dataset. It computes the summary statistics, including count, mean, standard deviation, minimum, and maximum values, for each numeric column in the dataset.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to compute the summary statistics for the age and salary columns in the dataset. We can use the describe() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
val summaryStats = employeeDataset.describe("age", "salary")
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the describe() function to compute the summary statistics for the age and salary columns in the dataset.
Output:
The output of the describe() function is a new dataset containing the summary statistics for each column specified in the function. In our example, the summaryStats variable will contain a new dataset with the summary statistics for the age and salary columns:
+-------+-----------------+-----------------+
|summary| age| salary|
+-------+-----------------+-----------------+
| count| 4| 4|
| mean| 28.00| 3075.0|
| stddev|2.581988897471611|222.1084339724426|
| min| 25| 2800.0|
| max| 32| 3500.0|
+-------+-----------------+-----------------+
Note that the describe() function is a transformation operation that does not trigger the computation of the dataset, and can be used for large datasets without performance issues.
first()β
Description:
The first() function in Spark 3 Dataset is used to retrieve the first element of the dataset. It returns the first element of the dataset as a new instance of the case class or a tuple, depending on the structure of the dataset.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to retrieve the details of the first employee in the dataset. We can use the first() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
val firstEmployee = employeeDataset.first()
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the first() function to retrieve the details of the first employee in the dataset.
Output:
The output of the first() function is the first element of the dataset as a new instance of the case class or a tuple, depending on the structure of the dataset. In our example, the firstEmployee variable will contain a new instance of the Employee case class representing the details of the first employee in the dataset:
Employee("John", 28, 3000.0)
Note that the first() function is an action operation that triggers the computation of the dataset, and should be used with caution for large datasets.
foreach()β
Description:
The foreach() function in Spark 3 Dataset is used to apply a given function to each element of the dataset. It iterates over the elements of the dataset and applies a user-defined function to each element.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to print the details of each employee in the dataset. We can use the foreach() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
employeeDataset.foreach(employee => println(s"Name: ${employee.name}, Age: ${employee.age}, Salary: ${employee.salary}"))
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the foreach() function to print the details of each employee in the dataset.
Output:
The output of the foreach() function is the result of applying the given function to each element of the dataset. In our example, the output will be the details of each employee in the dataset printed to the console:
Name: John, Age: 28, Salary: 3000.0
Name: Mary, Age: 25, Salary: 2800.0
Name: Peter, Age: 32, Salary: 3500.0
Name: Jane, Age: 27, Salary: 3200.0
Note that the foreach() function is an action operation that triggers the computation of the dataset, and should be used with caution for large datasets. Also, the order of the elements in the output is not guaranteed, as Spark processes elements in parallel.
foreachPartition()β
Description:
The foreachPartition() function in Spark 3 Dataset is used to apply a given function to each partition of the dataset. It is similar to the foreach() function, but instead of applying the function to each element, it applies the function to each partition of the dataset. It can be useful when we need to perform some expensive initialization or setup once per partition, rather than once per element.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to compute the total salary of each partition of the dataset. We can use the foreachPartition() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
employeeDataset.foreachPartition(iterator => {
var partitionTotalSalary = 0.0
iterator.foreach(employee => partitionTotalSalary += employee.salary)
println(s"Partition Total Salary: $partitionTotalSalary")
})
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the foreachPartition() function to compute the total salary of each partition of the dataset.
Output:
The output of the foreachPartition() function is the result of applying the given function to each partition of the dataset. In our example, the output will be the total salary of each partition of the dataset printed to the console:
Partition Total Salary: 5800.0
Partition Total Salary: 6700.0
Note that the foreachPartition() function is an action operation that triggers the computation of the dataset, and should be used with caution for large datasets. Also, the order of the partitions in the output is not guaranteed, as Spark processes partitions in parallel.
head()β
Description:
The head() function in Spark 3 Dataset is used to return the first n rows of a dataset as a new Dataset. It is similar to the take() function, but instead of returning an Array, it returns a new Dataset.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to get the first 2 rows of the dataset. We can use the head() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
val firstTwoEmployees = employeeDataset.head(2)
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the head() function to get the first 2 rows of the dataset as a new Dataset.
Output:
The output of the head() function is a new Dataset containing the first n rows of the original dataset. In our example, the output will be a new Dataset containing the details of the first 2 employees:
+----+---+------+
|name|age|salary|
+----+---+------+
|John| 28|3000.0|
|Mary| 25|2800.0|
+----+---+------+
Note that the head() function is an action operation that triggers the computation of the dataset, and should be used with caution for large datasets. Also, the order of the rows in the output is not guaranteed, as Spark processes data in parallel.
reduce()β
Description:
The reduce() function in Spark 3 Dataset is used to aggregate the elements of a dataset using a specified binary operator. The binary operator must be associative and commutative, and it is applied to pairs of elements until a single result is obtained.
Usage Example:
Suppose we have a dataset containing a sequence of integers, and we want to compute the sum of all these integers using the reduce() function. We can use the reduce() function as shown below:
val numbers = Seq(1, 2, 3, 4, 5)
val numbersDataset = numbers.toDS()
val sum = numbersDataset.reduce((x, y) => x + y)
In this example, we first create a sequence of integers. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the reduce() function to compute the sum of all the integers in the dataset.
Output:
The output of the reduce() function is a single value obtained by applying the binary operator to pairs of elements until a single result is obtained. In our example, the output will be the sum of all the integers in the dataset:
15
Note that the reduce() function is an action operation that triggers the computation of the dataset, and should be used with caution for large datasets. Also, the order of the elements in the dataset is not guaranteed, as Spark processes data in parallel.
show()β
Description:
The show() function in Spark 3 Dataset is used to display the contents of a dataset in a tabular format. It is useful for debugging and exploring the contents of a dataset.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to display the contents of the dataset in a tabular format. We can use the show() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
employeeDataset.show()
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the show() function to display the contents of the dataset in a tabular format.
Output:
The output of the show() function is a tabular representation of the dataset, with each row corresponding to a record in the dataset. In our example, the output will be:
+-----+---+------+
| name|age|salary|
+-----+---+------+
| John| 28|3000.0|
| Mary| 25|2800.0|
|Peter| 32|3500.0|
| Jane| 27|3200.0|
+-----+---+------+
Note that the show() function is an action operation that triggers the computation of the dataset, and should be used with caution for large datasets. It is intended for interactive use, such as exploring data in a notebook or shell, rather than for production use.
summary()β
Description:
The summary() function in Spark 3 Dataset is used to compute summary statistics for the columns of a dataset. It provides a quick way to obtain basic statistics for a large number of columns in a dataset.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to compute summary statistics for the age and salary columns. We can use the summary() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
employeeDataset.select("age", "salary").summary().show()
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the select() function to select the age and salary columns, and then apply the summary() function to compute summary statistics for these columns.
Output:
The output of the summary() function is a tabular representation of the summary statistics for the selected columns, including the count, mean, standard deviation, minimum, and maximum values. In our example, the output will be:
+-------+-----------------+------------------+
|summary| age| salary|
+-------+-----------------+------------------+
| count| 4| 4|
| mean| 28.0| 3125.0|
| stddev|2.160246899469287| 298.8118022040274|
| min| 25| 2800.0|
| 25%| 27| 3000.0|
| 50%| 28| 3200.0|
| 75%| 32| 3325.0|
| max| 32| 3500.0|
+-------+-----------------+------------------+
Note that the summary() function is an action operation that triggers the computation of the dataset, and should be used with caution for large datasets. It is intended for quick analysis of data, rather than for production use.
tail()β
Description:
The tail() function in Spark 3 Dataset returns the last n rows of the dataset. It is useful when we want to inspect the end of a dataset or when we need to extract only the last few rows of a dataset for further processing.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to extract the last 2 rows of this dataset. We can use the tail() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
employeeDataset.tail(2).foreach(println)
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the tail() function to extract the last 2 rows of the dataset, and then apply the foreach() function to print these rows.
Output:
The output of the tail() function is a Dataset containing the last n rows of the original dataset. In our example, the output will be:
Employee(Peter,32,3500.0)
Employee(Jane,27,3200.0)
Note that the tail() function is an action operation that triggers the computation of the dataset, and should be used with caution for large datasets. It is intended for quick inspection of data, rather than for production use.
take()β
Description:
The take() function in Spark 3 Dataset returns an array of the first n rows of the dataset. It is useful when we need to extract only a few rows of a dataset for further processing, and we don't want to load the entire dataset into memory.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to extract the first 3 rows of this dataset. We can use the take() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
val firstThreeRows = employeeDataset.take(3)
firstThreeRows.foreach(println)
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the take() function to extract the first 3 rows of the dataset, and then apply the foreach() function to print these rows.
Output:
The output of the take() function is an array containing the first n rows of the original dataset. In our example, the output will be:
Employee(John,28,3000.0)
Employee(Mary,25,2800.0)
Employee(Peter,32,3500.0)
Note that the take() function is a transformation operation that does not trigger the computation of the entire dataset, and is useful for extracting only a few rows of data for further processing.
takeAsList()β
Description:
The takeAsList() function in Spark 3 Dataset is similar to the take() function, but it returns a Java list instead of a Scala array. It is useful when we want to use the extracted rows of a dataset in Java-based APIs that accept Java collections.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to extract the first 3 rows of this dataset as a Java list. We can use the takeAsList() function as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
val firstThreeRows = employeeDataset.takeAsList(3)
firstThreeRows.forEach(println)
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the takeAsList() function to extract the first 3 rows of the dataset, and then apply the forEach() function to print these rows.
Output:
The output of the takeAsList() function is a Java list containing the first n rows of the original dataset. In our example, the output will be:
Employee(John,28,3000.0)
Employee(Mary,25,2800.0)
Employee(Peter,32,3500.0)
Note that the takeAsList() function is a transformation operation that does not trigger the computation of the entire dataset, and is useful for extracting only a few rows of data for further processing in Java-based APIs.
toLocalIterator()β
Description:
The toLocalIterator() function in Spark 3 Dataset allows iterating over the elements of a dataset on the driver node, one partition at a time. This function can be useful when we want to perform some custom processing on the dataset elements using code that is only available on the driver node.
Usage Example:
Suppose we have a dataset containing the details of employees, including their names, ages, and salaries. We want to print the name and age of each employee by iterating over the dataset on the driver node using the toLocalIterator() function. We can do this as shown below:
case class Employee(name: String, age: Int, salary: Double)
val employeeDetails: Seq[Employee] = Seq(
Employee("John", 28, 3000.0),
Employee("Mary", 25, 2800.0),
Employee("Peter", 32, 3500.0),
Employee("Jane", 27, 3200.0)
)
val employeeDataset = employeeDetails.toDS()
val iterator = employeeDataset.toLocalIterator()
while (iterator.hasNext) {
val employee = iterator.next()
println(s"Name: ${employee.name}, Age: ${employee.age}")
}
In this example, we first create a sequence of case class instances containing the details of employees. Then, we convert this sequence to a Dataset using the toDS() function. Finally, we use the toLocalIterator() function to obtain an iterator over the elements of the dataset on the driver node, and then iterate over this iterator using a while loop to print the name and age of each employee.
Output:
The output of the toLocalIterator() function is the result of the custom processing performed on the dataset elements on the driver node. In our example, the output will be:
Name: John, Age: 28
Name: Mary, Age: 25
Name: Peter, Age: 32
Name: Jane, Age: 27
Note that the toLocalIterator() function can be used to perform custom processing on the dataset elements using any code that is available on the driver node.
Basic Dataset functionsβ
as(schema: StructType): DataFrameβ
Description:
The as
function in Spark 3 Dataset API is used to convert a Dataset of generic objects to a strongly typed Dataset by specifying a schema. The function takes a StructType object as an argument that represents the schema of the resulting strongly typed Dataset. The StructType object can be created using the StructType
class and by adding StructField
objects that represent the columns of the resulting strongly typed Dataset.
Usage Example:
Suppose we have a CSV file containing the details of employees, including their names, ages, and salaries. We want to read this CSV file into a DataFrame and then convert it into a strongly typed Dataset by specifying a schema. We can do this as shown below:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType}
val employeeSchema = new StructType()
.add(StructField("name", StringType, true))
.add(StructField("age", IntegerType, true))
.add(StructField("salary", DoubleType, true))
val employeeDF = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("employee.csv")
val employeeDS = employeeDF.as(employeeSchema)
employeeDS.show()
In this example, we first create a StructType
object called employeeSchema
that represents the schema of the resulting strongly typed Dataset. This schema consists of three columns: name (StringType), age (IntegerType), and salary (DoubleType).
Then, we read the CSV file into a DataFrame using the read
method of the SparkSession
object. We set the header
option to true
to indicate that the first row of the CSV file contains the column names, and set the inferSchema
option to true
to infer the data types of the columns from the contents of the CSV file.
Next, we use the as
function to convert the DataFrame into a strongly typed Dataset by specifying the employeeSchema
. Finally, we call the show
method on the resulting Dataset to display its contents.
Output:
The output of the show
method will be a table that displays the contents of the strongly typed Dataset, which will have the specified schema. For example:
+-----+---+------+
| name|age|salary|
+-----+---+------+
|John | 28|3000.0|
|Mary | 25|2800.0|
|Peter| 32|3500.0|
|Jane | 27|3200.0|
+-----+---+------+
Note that if the data in the DataFrame does not match the specified schema, the as
function will throw an exception. It is important to ensure that the schema is correctly specified to avoid errors at runtime.
cache()β
Description:
The cache
function in Spark 3 Dataset API is used to persist the contents of a Dataset in memory, so that subsequent operations on the Dataset can be performed more quickly. This function stores the Dataset in the cache memory of the executor nodes, which makes it faster to access and reduces the number of times the Dataset needs to be computed.
Usage Example:
Suppose we have a Dataset of employee data, and we want to perform multiple operations on this Dataset. In order to avoid recomputing the Dataset every time an operation is performed, we can cache the Dataset in memory using the cache
function as shown below:
val employeeDS = Seq(
("John", 28, 3000.0),
("Mary", 25, 2800.0),
("Peter", 32, 3500.0),
("Jane", 27, 3200.0)
).toDF("name", "age", "salary").as[Employee]
employeeDS.cache()
val filteredDS = employeeDS.filter(employee => employee.age >= 30)
val count = filteredDS.count()
println(s"Number of employees above 30: $count")
In this example, we first create a Dataset of employee data using the toDF
method and then converting it into a strongly typed Dataset using the as
method. Then we call the cache
method on this Dataset to cache it in memory.
Next, we perform an operation on the cached Dataset by filtering out the employees whose age is less than 30 using the filter
function. Finally, we call the count
function to count the number of employees whose age is greater than or equal to 30.
Output:
The output of this program will be the number of employees whose age is greater than or equal to 30. Since the Dataset has been cached in memory, the filter
and count
operations will be performed more quickly than if the Dataset had not been cached.
checkpoint(eager: Boolean)β
Description:
The checkpoint
function in Spark 3 Dataset API is used to save the contents of a Dataset to a checkpoint file in HDFS. This function can be used to avoid recomputing a Dataset from scratch if a failure occurs during its computation.
Usage Example:
Suppose we have a Dataset of employee data, and we want to checkpoint this Dataset to avoid recomputing it from scratch if a failure occurs. We can use the checkpoint
function as shown below:
val employeeDS = Seq(
("John", 28, 3000.0),
("Mary", 25, 2800.0),
("Peter", 32, 3500.0),
("Jane", 27, 3200.0)
).toDF("name", "age", "salary").as[Employee]
employeeDS.checkpoint()
val filteredDS = employeeDS.filter(employee => employee.age >= 30)
val count = filteredDS.count()
println(s"Number of employees above 30: $count")
In this example, we first create a Dataset of employee data using the toDF
method and then converting it into a strongly typed Dataset using the as
method. Then we call the checkpoint
method on this Dataset to save its contents to a checkpoint file.
Next, we perform an operation on the Dataset by filtering out the employees whose age is less than 30 using the filter
function. Finally, we call the count
function to count the number of employees whose age is greater than or equal to 30.
Output:
The output of this program will be the number of employees whose age is greater than or equal to 30. If a failure occurs during the computation of this Dataset, it can be recovered from the checkpoint file rather than being recomputed from scratch.
columns: Array[String]β
Description:
The columns
function in Spark 3 DataFrame API returns an array of column names present in the DataFrame. It is a simple and quick way to get the list of column names present in the DataFrame.
Usage Example:
Suppose we have a DataFrame containing employee data, and we want to get the list of column names present in this DataFrame. We can use the columns
function as shown below:
val employeeDF = Seq(
("John", 28, 3000.0),
("Mary", 25, 2800.0),
("Peter", 32, 3500.0),
("Jane", 27, 3200.0)
).toDF("name", "age", "salary")
val columnNames = employeeDF.columns
println("Columns in the DataFrame: ")
columnNames.foreach(println)
In this example, we first create a DataFrame of employee data using the toDF
method. Then we call the columns
method on this DataFrame to get the list of column names present in it.
Finally, we print the list of column names using a foreach
loop.
Output:
The output of this program will be a list of column names present in the DataFrame:
Columns in the DataFrame:
name
age
salary
createGlobalTempView(viewName: String): Unitβ
Description:
The createGlobalTempView
function in Spark 3 DataFrame API creates a global temporary view with the specified name. A global temporary view is a view that is accessible across all SparkSessions and even across different applications. It is tied to a system preserved database global_temp
.
Usage Example:
Suppose we have a DataFrame containing employee data, and we want to create a global temporary view named "employee_view" using this DataFrame. We can use the createGlobalTempView
function as shown below:
val employeeDF = Seq(
("John", 28, 3000.0),
("Mary", 25, 2800.0),
("Peter", 32, 3500.0),
("Jane", 27, 3200.0)
).toDF("name", "age", "salary")
employeeDF.createGlobalTempView("employee_view")
In this example, we first create a DataFrame of employee data using the toDF
method. Then we call the createGlobalTempView
method on this DataFrame with the view name "employee_view" to create a global temporary view.
Output:
The output of this program will not be visible on the console. The global temporary view is now created and can be accessed from any SparkSession in the same application or across different applications.
createOrReplaceGlobalTempView(viewName: String): Unitβ
Description:
The createOrReplaceGlobalTempView
function in Spark 3 DataFrame API creates a global temporary view with the specified name, or replaces an existing global temporary view with the same name. A global temporary view is a view that is accessible across all SparkSessions and even across different applications. It is tied to a system preserved database global_temp
.
Usage Example:
Suppose we have a DataFrame containing employee data, and we want to create a global temporary view named "employee_view" using this DataFrame. We can use the createOrReplaceGlobalTempView
function as shown below:
val employeeDF = Seq(
("John", 28, 3000.0),
("Mary", 25, 2800.0),
("Peter", 32, 3500.0),
("Jane", 27, 3200.0)
).toDF("name", "age", "salary")
employeeDF.createOrReplaceGlobalTempView("employee_view")
In this example, we first create a DataFrame of employee data using the toDF
method. Then we call the createOrReplaceGlobalTempView
method on this DataFrame with the view name "employee_view" to create or replace a global temporary view.
Output:
The output of this program will not be visible on the console. The global temporary view is now created or replaced and can be accessed from any SparkSession in the same application or across different applications.
createTempView(viewName: String): Unitβ
Description:
The createTempView
function in Spark 3 DataFrame API creates a local temporary view with the specified name. A local temporary view is only accessible within the same SparkSession that created it.
Usage Example:
Suppose we have a DataFrame containing employee data, and we want to create a local temporary view named "employee_view" using this DataFrame. We can use the createTempView
function as shown below:
val employeeDF = Seq(
("John", 28, 3000.0),
("Mary", 25, 2800.0),
("Peter", 32, 3500.0),
("Jane", 27, 3200.0)
).toDF("name", "age", "salary")
employeeDF.createTempView("employee_view")
In this example, we first create a DataFrame of employee data using the toDF
method. Then we call the createTempView
method on this DataFrame with the view name "employee_view" to create a local temporary view.
Output:
The output of this program will not be visible on the console. The local temporary view is now created and can be accessed within the same SparkSession using the specified view name.
dtypes: Array[(String, String)]β
Description:
The dtypes
function in Spark 3 DataFrame API returns an array of tuples containing the name of each column in the DataFrame and its corresponding data type. The data type is represented as a string.
Usage Example:
Suppose we have a DataFrame containing employee data, and we want to get the data types of each column in this DataFrame. We can use the dtypes
function as shown below:
val employeeDF = Seq(
("John", 28, 3000.0),
("Mary", 25, 2800.0),
("Peter", 32, 3500.0),
("Jane", 27, 3200.0)
).toDF("name", "age", "salary")
val dataTypes = employeeDF.dtypes
dataTypes.foreach(println)
In this example, we first create a DataFrame of employee data using the toDF
method. Then we call the dtypes
function on this DataFrame to get the data types of each column. Finally, we use a foreach loop to print out the name and data type of each column.
Output:
The output of this program will be:
(name,StringType)
(age,IntegerType)
(salary,DoubleType)
This shows that the "name" column has a String data type, the "age" column has an Integer data type, and the "salary" column has a Double data type.
explain(): Unitβ
Description:
The explain
function in Spark 3 DataFrame API is used to display the physical plan of a DataFrame or Dataset. It provides an easy way to understand how Spark will execute a given query. The explain
function returns the execution plan as a string, which can be printed to the console or written to a log file.
Usage Example:
Suppose we have a DataFrame containing employee data, and we want to see how Spark will execute a query on this DataFrame. We can use the explain
function as shown below:
val employeeDF = Seq(
("John", 28, 3000.0),
("Mary", 25, 2800.0),
("Peter", 32, 3500.0),
("Jane", 27, 3200.0)
).toDF("name", "age", "salary")
employeeDF.filter($"age" > 25).explain()
In this example, we first create a DataFrame of employee data using the toDF
method. Then we apply a filter to this DataFrame to select only those employees whose age is greater than 25. Finally, we call the explain
function on this filtered DataFrame to display the physical plan of this query.
Output:
The output of this program will be:
== Physical Plan ==
*(1) Filter (isnotnull(age#1) && (age#1 > 25))
+- *(1) Scan ExistingRDD[name#0,age#1,salary#2]
This output shows the physical plan that Spark will use to execute this query. It indicates that Spark will first scan an existing RDD containing the data, then apply the filter to select only those rows where the age is greater than 25.
explain(extended: Boolean): Unitβ
Description:
The explain
function in Spark 3 DataFrame API is used to display the physical plan of a DataFrame or Dataset, with optional extended information. It provides an easy way to understand how Spark will execute a given query. The explain
function returns the execution plan as a string, which can be printed to the console or written to a log file.
Usage Example:
Suppose we have a DataFrame containing employee data, and we want to see how Spark will execute a query on this DataFrame. We can use the explain
function as shown below:
val employeeDF = Seq(
("John", 28, 3000.0),
("Mary", 25, 2800.0),
("Peter", 32, 3500.0),
("Jane", 27, 3200.0)
).toDF("name", "age", "salary")
employeeDF.filter($"age" > 25).explain(true)
In this example, we first create a DataFrame of employee data using the toDF
method. Then we apply a filter to this DataFrame to select only those employees whose age is greater than 25. Finally, we call the explain
function on this filtered DataFrame with the extended
parameter set to true
to display the physical plan of this query with extended information.
Output:
The output of this program will be:
== Parsed Logical Plan ==
'Filter ('age > 25)
+- AnalysisBarrier[name#0, age#1, salary#2]
== Analyzed Logical Plan ==
name: string, age: int, salary: double
Filter (age#1 > 25)
+- Project [name#0, age#1, salary#2]
+- AnalysisBarrier[name#0, age#1, salary#2]
== Optimized Logical Plan ==
Filter (isnotnull(age#1) && (age#1 > 25))
+- Project [name#0, age#1, salary#2]
+- Scan ExistingRDD[name#0,age#1,salary#2]
== Physical Plan ==
*(1) Filter (isnotnull(age#1) && (age#1 > 25))
+- *(1) Scan ExistingRDD[name#0,age#1,salary#2]
This output shows the physical plan that Spark will use to execute this query, with additional information about the logical plan and the optimized plan. The extended
parameter set to true
adds additional details to the plan, making it easier to understand how Spark will execute the query.
The explain()
method in Spark provides information about the physical and logical plans of a query execution. It helps in understanding how Spark executes a query and can be useful in optimizing the query.β
The explain(mode: String)
method in Spark Dataset API is used to display the physical plan of the query execution in a specific mode. The supported modes are formatted
, extended
, and codegen
.
- The
formatted
mode displays the physical plan in a human-readable format. - The
extended
mode provides additional details about the physical plan, such as the number of output rows, the size of the output in bytes, and the estimated cost of the execution. - The
codegen
mode displays the generated code for the query execution.
Here's an example usage of the explain()
method:
import org.apache.spark.sql.functions._
val df = Seq((1,"a"), (2, "b"), (3, "c")).toDF("id", "name")
df.filter(col("id") > 1).explain("extended")
Output:
== Parsed Logical Plan ==
'Filter ('id > 1)
+- LogicalRDD [id#0, name#1], false
== Analyzed Logical Plan ==
id: int, name: string
Filter (id#0 > 1)
+- LogicalRDD [id#0, name#1], false
== Optimized Logical Plan ==
Filter (id#0 > 1)
+- LogicalRDD [id#0, name#1], false
== Physical Plan ==
*(1) Filter (isnotnull(id#0) AND (id#0 > 1))
+- *(1) Scan JDBCRelation((SELECT `id`, `name` FROM `test`) tmpTable) [numPartitions=1] []
As you can see in the output, the physical plan of the query execution is displayed in the extended
mode.
hint()β
The hint
method in Apache Spark allows you to provide a hint to the query optimizer. Hints provide additional information about the dataset to the optimizer that can help it make better decisions about how to execute the query. You can use the hint
method to specify optimizer rules or optimization hints in JSON format.
The hint
method takes two parameters: the name of the hint and its parameters. The hint name is a string that specifies the type of hint you want to use. The parameters are optional and are used to provide additional information to the hint.
Here's an example of how to use the hint
method:
val df = spark.read.parquet("path/to/parquet/file")
// Specify the hint "broadcast" with parameter "join" to optimize a join
df.hint("broadcast", "join").show()
In the example above, we read in a Parquet file and apply the hint
method to the resulting DataFrame
. We provide the hint name "broadcast" and the parameter "join" to optimize a join operation. Finally, we call the show
method to display the result.
Note that hints should be used with caution, and you should only use them if you have a good understanding of the query optimizer and how it works. In most cases, the optimizer will make the right decisions about how to execute a query without hints.
inputFiles: Array[String]β
The inputFiles
method is available on Spark 2.0+ and returns an array of strings that represent the input files that were read to create the current DataFrame
or Dataset
.
Here's an example:
val df = spark.read.json("path/to/json/files")
val inputFiles = df.inputFiles
In the example above, we read in a JSON file and create a DataFrame
df
. Then, we use the inputFiles
method to retrieve the list of input files that were used to create the DataFrame
.
Note that if the DataFrame
was created from multiple input files, the inputFiles
method returns an array with the path of all files. Also, if the input file is a directory, then all the files within that directory will be returned as part of the array.
Spark 3 Dataset: isEmpty: Booleanβ
The isEmpty
method in Spark 3 Dataset API returns a Boolean that indicates if the Dataset is empty or not. If the Dataset is empty, it returns true; otherwise, it returns false.
Usage
The isEmpty
method is called on a Dataset object and returns a Boolean value indicating if the Dataset is empty or not. Here is the basic syntax of calling the isEmpty
method:
val ds: Dataset[Person] = Seq.empty[Person].toDS()
val isEmpty: Boolean = ds.isEmpty
Example
Here's an example of using the isEmpty
method to check if a Dataset is empty or not:
case class Person(name: String, age: Int)
val spark = SparkSession.builder()
.appName("isEmptyExample")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val emptyDs = Seq.empty[Person].toDS()
val nonEmptyDs = Seq(Person("Alice", 25), Person("Bob", 30)).toDS()
println(emptyDs.isEmpty) // true
println(nonEmptyDs.isEmpty) // false
Output
The output of the example code will be:
true
false
Note that the isEmpty
method returns true
for an empty Dataset and false
for a non-empty Dataset.
Function Name: isLocal()β
Description: isLocal() is a boolean function in Spark Dataset API that checks if the current Dataset is local or distributed across a cluster.
Usage: Assuming a Dataset "ds" has been created, we can use the isLocal() function as follows:
val isDsLocal: Boolean = ds.isLocal
Output: The isLocal() function returns true if the Dataset is local, meaning that all data is stored on a single machine, and false if the data is distributed across a cluster.
javaRDD()β
The javaRDD
function is a method of the Spark 3 Dataset API in Scala. It returns a JavaRDD object that represents the current Dataset as an RDD of Java objects. The elements in the RDD are of type T, which is the same type as the elements in the Dataset.
Usage Example:
Suppose you have a Dataset of employee data and you want to convert it to a JavaRDD. Here's how you can use the javaRDD
function to do that:
import org.apache.spark.api.java.JavaRDD
case class Employee(name: String, age: Int)
val employeeData = Seq(Employee("John Doe", 25), Employee("Jane Smith", 30), Employee("Bob Johnson", 40))
val employeeDS = employeeData.toDS()
val employeeRDD: JavaRDD[Employee] = employeeDS.javaRDD()
Output:
In this example, the javaRDD
function is called on the employeeDS
Dataset, which converts it to a JavaRDD of Employee objects. The employeeRDD
JavaRDD can now be used with Java APIs that require RDDs, instead of using the Dataset APIs.
Note: The JavaRDD object returned by javaRDD
is immutable, meaning it cannot be modified.
Function Name: localCheckpointβ
Description: The localCheckpoint() function is used to persist a Dataset on the local file system. Unlike checkpoint() function which persists the Dataset on a fault-tolerant storage system, localCheckpoint() function stores the Dataset on the local file system which is not fault-tolerant. The purpose of localCheckpoint() function is to improve the performance of iterative algorithms.
Usage: The localCheckpoint() function can be used on a Dataset to persist it on the local file system. Once persisted, the Dataset can be retrieved from the local file system and used for further processing.
Example:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("localCheckpoint Example")
.master("local[*]")
.getOrCreate()
val data = Seq(1, 2, 3, 4, 5)
val ds = spark.createDataset(data)
ds.localCheckpoint()
val result = ds.map(x => x * 2)
result.show()
Output:
+---+
| _1|
+---+
| 2|
| 4|
| 6|
| 8|
| 10|
+---+
In the above example, we create a Dataset 'ds' from a sequence of integers. We then apply the localCheckpoint() function on it to persist it on the local file system. Later, we apply the map() function on the Dataset to multiply each element by 2 and store the result in 'result' variable. Finally, we print the output of the 'result' variable using the show() function.
Spark 3 Dataset: persist(newLevel: StorageLevel)β
The persist(newLevel: StorageLevel)
function in Spark 3 Dataset API is used to persist the dataset with the specified storage level in memory or disk or both. The storage level determines where the data is stored and in what format, and it can be one of the following:
- MEMORY_ONLY
- MEMORY_AND_DISK
- MEMORY_ONLY_SER
- MEMORY_AND_DISK_SER
- DISK_ONLY
- MEMORY_ONLY_2
- MEMORY_AND_DISK_2
- MEMORY_ONLY_SER_2
- MEMORY_AND_DISK_SER_2
- OFF_HEAP
By default, the storage level is set to MEMORY_AND_DISK, which means that the data is stored both in memory and on disk.
Usage
Here's an example of how to use persist(newLevel: StorageLevel)
in Scala:
import org.apache.spark.storage.StorageLevel
val dataset = Seq((1, "John"), (2, "Sarah"), (3, "Alex")).toDF("id", "name").as[(Int, String)]
// Persisting the dataset with MEMORY_ONLY storage level
dataset.persist(StorageLevel.MEMORY_ONLY)
// Doing some transformations on the persisted dataset
val filteredDataset = dataset.filter(_._2.startsWith("A"))
// Unpersisting the dataset from memory
dataset.unpersist()
In the above example, we created a simple dataset of (id, name) pairs, and persisted it with the MEMORY_ONLY storage level. We then performed some transformations on the persisted dataset, and finally unpersisted it from memory.
Output
There is no output generated by persist(newLevel: StorageLevel)
function. However, it can improve the performance of subsequent operations on the dataset by avoiding the need to recompute the dataset from scratch each time it is accessed.
printSchema(level: Int): Unitβ
The printSchema
method is used to print the schema of a Spark Dataset
. The schema is a representation of the data structure of the Dataset
, which includes the names and data types of its columns.
The method takes an optional level
parameter that specifies the indentation level for nested fields. By default, the level is set to 0, which means that all fields are printed at the same level.
Usage Example:
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val spark = SparkSession.builder()
.appName("printSchemaExample")
.master("local[*]")
.getOrCreate()
// define a schema for a DataFrame
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = true)
))
// create a sample DataFrame
val data = Seq(
Row("Alice", 25),
Row("Bob", 30),
Row("Charlie", null)
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
// print the schema
df.printSchema()
Output:
root
|-- name: string (nullable = false)
|-- age: integer (nullable = true)
schema: StructTypeβ
schema
is a method available in Spark 3's Dataset API. It returns the schema of the Dataset as a StructType object. The schema describes the structure of the data in the Dataset, including column names, data types, and nullable constraints.
Usage Example:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.functions.col
val data = Seq(("John", 25), ("Bob", 30), ("Jane", 35))
val df = spark.createDataFrame(data).toDF("name", "age")
val mySchema: StructType = df.schema
println("Schema of my DataFrame:")
mySchema.printTreeString()
Output:
Schema of my DataFrame:
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
In this example, we create a DataFrame called df
from a sequence of tuples. We then use the schema
method to obtain the schema of the DataFrame as a StructType
object called mySchema
. Finally, we use the printTreeString
method to print the schema in a tree-like format. The output shows that the DataFrame has two columns, "name" and "age", and the data types of these columns are String and Integer, respectively.
storageLevel: StorageLevelβ
storageLevel
is a method in Spark 3 Dataset API that returns the storage level of the Dataset.
The storage level is the level of caching used to persist the Dataset in memory or disk, and it is specified using the persist()
method. It determines the trade-off between memory usage and performance, as a higher storage level will use more memory but result in faster execution times.
The storageLevel
method returns an object of the StorageLevel
class, which represents the storage level of the Dataset. The StorageLevel
class has several static methods that can be used to create a storage level object with different caching options, such as MEMORY_ONLY
, MEMORY_AND_DISK
, DISK_ONLY
, etc.
Example usage:
import org.apache.spark.storage.StorageLevel
// create a DataFrame
val df = Seq((1, "John"), (2, "Mary"), (3, "Bob")).toDF("id", "name")
// cache the DataFrame with MEMORY_ONLY storage level
df.persist(StorageLevel.MEMORY_ONLY)
// get the storage level of the DataFrame
val level = df.storageLevel
// print the storage level
println(level)
Output:
Memory Deserialized 1x Replicated
In this example, we create a DataFrame df
and cache it in memory with MEMORY_ONLY
storage level using the persist()
method. We then call the storageLevel
method on the DataFrame to get its storage level, and print it to the console. The output shows that the DataFrame is stored in memory, deserialized, and replicated once.
toDF(colNames: String*): DataFrameβ
The toDF
function is used to convert a Dataset
into a DataFrame
, where the column names are specified by the input argument colNames
. This function returns a DataFrame
.
Usage Example:
Suppose we have a case class Person
and a Dataset
of Person
objects. We can convert this Dataset
into a DataFrame
as follows:
case class Person(name: String, age: Int)
val people = Seq(Person("Alice", 25), Person("Bob", 30)).toDS()
val df = people.toDF("person_name", "person_age")
df.show()
Output:
+------------+----------+
|person_name |person_age|
+------------+----------+
|Alice | 25|
|Bob | 30|
+------------+----------+
toDF(): DataFrame
The toDF
function is used to convert a Dataset
into a DataFrame
, where the column names are automatically generated based on the field names of the case class used to create the Dataset
. This function returns a DataFrame
.
Usage Example:
Suppose we have a case class Person
and a Dataset
of Person
objects. We can convert this Dataset
into a DataFrame
as follows:
case class Person(name: String, age: Int)
val people = Seq(Person("Alice", 25), Person("Bob", 30)).toDS()
val df = people.toDF()
df.show()
Output:
+----+---+
|name|age|
+----+---+
|Alice| 25|
| Bob| 30|
+----+---+
Note: The toDF
function generates column names based on the field names of the case class used to create the Dataset
. It removes the prefix _$outer
from the column names, which is added by Scala's inner classes. If there are no field names in the case class, then the generated column names will be "col0", "col1", etc.
toJavaRDD: JavaRDD[T]β
toJavaRDD
is a function in Apache Spark's Dataset API that converts a Dataset[T]
object into a JavaRDD[T] object. The JavaRDD[T] object can then be used in Java code to perform operations on the data.
Here's an example usage of toJavaRDD
:
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.{Dataset, SparkSession}
case class Person(name: String, age: Int)
val spark = SparkSession.builder()
.appName("toJavaRDDExample")
.master("local[*]")
.getOrCreate()
// create a Dataset of Person objects
val peopleData: Seq[Person] = Seq(Person("Alice", 25), Person("Bob", 30), Person("Charlie", 35))
val peopleDS: Dataset[Person] = spark.createDataset(peopleData)
// convert the Dataset to a JavaRDD
val peopleRDD: JavaRDD[Person] = peopleDS.toJavaRDD()
// print out the elements in the JavaRDD
peopleRDD.foreach(println)
In this example, we create a Dataset
of Person
objects using the createDataset
function of a SparkSession
. We then call toJavaRDD
on this Dataset
to convert it into a JavaRDD
. Finally, we print out the elements in the JavaRDD
using the foreach
function.
Function name: unpersist()β
Description: The unpersist()
function in Spark 3 Dataset API is used to remove the persisted memory of the RDD or Dataset from the Spark's cache. This method is useful when the data in the cache is no longer needed and we want to release the memory occupied by the cache for other use.
Usage example:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("unpersistExample")
.master("local[2]")
.getOrCreate()
val df = spark.read.csv("example.csv")
// Cache the DataFrame
df.cache()
// Unpersist the DataFrame
df.unpersist()
// Check if the DataFrame is still in the cache
println(df.is_cached)
Output:
false
In this example, we first read a CSV file and then cache the resulting DataFrame. We then unpersist the DataFrame and check whether it is still in the cache by using the is_cached
method. The output shows that the DataFrame is no longer cached.
Function name: unpersist(blocking: Boolean)
Description: The unpersist(blocking: Boolean)
function in Spark 3 Dataset API is used to remove the persisted memory of the RDD or Dataset from the Spark's cache. The blocking parameter indicates whether the method should block until the cache is removed.
Usage example:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("unpersistExample")
.master("local[2]")
.getOrCreate()
val df = spark.read.csv("example.csv")
// Cache the DataFrame
df.cache()
// Unpersist the DataFrame blocking until it is removed
df.unpersist(blocking = true)
// Check if the DataFrame is still in the cache
println(df.is_cached)
Output:
false
In this example, we first read a CSV file and then cache the resulting DataFrame. We then unpersist the DataFrame with blocking set to true and check whether it is still in the cache by using the is_cached
method. The output shows that the DataFrame is no longer cached.
write: DataFrameWriter[T]β
The write
method is used to write the contents of a Spark Dataset
to an external storage system in various formats such as CSV, JSON, Parquet, ORC, and more. It returns a DataFrameWriter
object, which provides methods for configuring the write operation, such as setting the output path, format, partitioning, compression, and more.
Here's an example of using the write
method to write a Dataset
of Person
objects to a Parquet file:
import org.apache.spark.sql.{Dataset, SparkSession}
case class Person(name: String, age: Int)
val spark = SparkSession.builder()
.appName("Write Dataset to Parquet")
.master("local[*]")
.getOrCreate()
val people = Seq(Person("Alice", 25), Person("Bob", 30), Person("Charlie", 35))
val peopleDS: Dataset[Person] = spark.createDataset(people)
peopleDS.write
.format("parquet")
.mode("overwrite")
.save("/path/to/output/parquet")
In this example, we first define a case class Person
and create a Dataset
of Person
objects. Then we call the write
method on the peopleDS
Dataset
, which returns a DataFrameWriter
. We configure the writer to use the Parquet format, overwrite any existing output, and save the output to the specified path. Finally, we call the save
method to execute the write operation.
writeStream: DataStreamWriter[T]β
The writeStream
function is used in Spark to write the contents of a streaming Dataset
to an output sink. It returns a DataStreamWriter
object, which can be used to configure the output sink and write the data.
Here's a brief description of the function parameters and their usage:
format(source: String)
: Specifies the format of the output sink. Thesource
parameter should be a valid identifier for the sink format, such as"parquet"
,"csv"
,"kafka"
, or"memory"
.outputMode(outputMode: String)
: Specifies the output mode of the sink. TheoutputMode
parameter should be a valid identifier for the output mode, such as"append"
,"complete"
, or"update"
.option(key: String, value: String)
: Allows additional configuration options to be passed to the sink.partitioning(columns: Column*)
: Specifies the partitioning scheme for the output data.queryName(queryName: String)
: Specifies a name for the streaming query, which can be useful for monitoring and debugging.trigger(trigger: Trigger)
: Specifies the trigger for the streaming query, which determines how often the sink is updated.
Usage example:
import org.apache.spark.sql.streaming.Trigger
// create a streaming DataFrame from a Kafka topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my_topic")
.load()
// write the streaming DataFrame to a console sink
val query = df
.writeStream
.format("console")
.outputMode("append")
.option("truncate", false)
.start()
// trigger the sink to update every 10 seconds
query.trigger(Trigger.ProcessingTime("10 seconds"))
// wait for the query to finish
query.awaitTermination()
This code reads a streaming DataFrame from a Kafka topic and writes it to the console sink in append mode. The trigger
function is used to update the sink every 10 seconds. Finally, the awaitTermination
function is called to start the query and wait for it to finish.
writeTo(table: String): DataFrameWriterV2[T]β
The writeTo
function in Spark 3 Dataset API allows you to write the content of a Dataset to a specific table in a catalog.
Here is a brief description of the function:
writeTo(table: String): DataFrameWriterV2[T]
: Writes the content of the dataset to a specific table in a catalog.
The writeTo
function returns a DataFrameWriterV2[T]
object that can be used to configure various output options for the data being written.
Usage example:
import org.apache.spark.sql.SaveMode
val df = Seq(("Alice", 25), ("Bob", 30), ("Charlie", 35)).toDF("name", "age")
// Write the content of the dataframe to a specific table in a catalog
df.writeTo("my_database.my_table")
.mode(SaveMode.Append)
.save()
In this example, the content of the DataFrame df
is written to the table my_database.my_table
using the writeTo
function. The mode
function is used to specify the save mode as Append
. Finally, the save
function is called to write the data to the specified table.
Streamingβ
def isStreaming: Booleanβ
Description:
The def isStreaming
function is used to determine whether a Dataset contains one or more sources that continuously return data as it arrives, indicating that the Dataset is associated with a streaming source. When a Dataset reads data from a streaming source, it needs to be executed as a StreamingQuery using the start()
method in DataStreamWriter. This function is particularly useful when working with streaming data in Apache Spark, as it allows you to distinguish between streaming and non-streaming Datasets.
Usage Example:
Suppose we have a Dataset streamingData
that is reading data from a Kafka streaming source. We can use the def isStreaming
function to check if the Dataset is associated with a streaming source.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Streaming Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Reading data from a Kafka streaming source
val streamingData = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load()
// Checking if the Dataset is associated with a streaming source
val isStreaming = streamingData.isStreaming
println(s"Is the Dataset streaming? $isStreaming")
Output:
Is the Dataset streaming? true
Explanation:
In the example above, we first create a SparkSession and import its implicits to enable Spark SQL functionalities. We then read data from a Kafka streaming source using spark.readStream
and other options specific to Kafka. After reading the data, we call the isStreaming
function on the streamingData
Dataset to check if it is associated with a streaming source. Since we are reading data from a streaming source (Kafka), the isStreaming
function returns true
, indicating that the Dataset contains a streaming source. If the Dataset were reading data from a batch source, isStreaming
would return false
.
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]β
Description:
The withWatermark
function is used to define an event time watermark for a Dataset that contains event time data. A watermark is a point in time before which it is assumed that no more late data will arrive. Spark uses watermarks for various purposes, including determining when a time window aggregation can be finalized and to optimize state management for ongoing aggregations, mapGroupsWithState, and dropDuplicates operators.
The current watermark is computed by finding the maximum event time (MAX(eventTime)) across all partitions in the query and then subtracting a user-specified delayThreshold
from it. The delayThreshold
is the minimum delay to wait for late data to arrive, relative to the latest record that has been processed. For example, if the delayThreshold
is set to "1 minute," it means that Spark will wait for data to arrive up to 1 minute after the latest processed record before finalizing time-based aggregations.
Please note that due to the cost of coordinating the watermark value across partitions, the actual watermark used is only guaranteed to be at least delayThreshold
behind the actual event time. As a result, in some cases, Spark may still process records that arrive more than delayThreshold
late.
Parameters:
- eventTime: The name of the column that contains the event time of each row in the Dataset.
- delayThreshold: The minimum delay to wait for data to arrive late, specified as an interval (e.g., "1 minute" or "5 hours"). The
delayThreshold
should not be negative.
Usage Example:
Suppose we have a streaming Dataset streamingData
that contains event time data. We want to define an event time watermark for the event time column "timestamp" with a delay threshold of "5 minutes."
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("Watermark Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Reading streaming data with event time "timestamp" from a Kafka source
val streamingData = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
// Defining an event time watermark with a delay threshold of "5 minutes"
val watermarkedData = streamingData.withWatermark("timestamp", "5 minutes")
// Performing windowed aggregation based on event time with watermark applied
val windowedAggregation = watermarkedData
.groupBy(window($"timestamp", "10 minutes", "5 minutes"))
.count()
// Starting the streaming query
val query = windowedAggregation.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
Explanation:
In the example above, we create a SparkSession and import its implicits to enable Spark SQL functionalities. We then read streaming data from a Kafka source with an event time column "timestamp." We apply a watermark with a delay threshold of "5 minutes" to the Dataset using the withWatermark
function. After that, we perform windowed aggregation based on event time with a window size of "10 minutes" and a slide duration of "5 minutes."
By applying the watermark, Spark will know when to finalize the windowed aggregations, and it will also minimize the state needed for ongoing aggregations. The streaming query is set to output the results in "update" mode and display them on the console.
Please note that in a real-world scenario, the output mode and the sink would depend on the use case and requirements.
Typed transformationsβ
alias(alias: Symbol): Dataset[T]
β
alias(alias: String): Dataset[T]
β
Description:
The alias
function in Apache Spark's Dataset API is used to assign an alias (a new name) to a column in a Dataset. It creates a new Dataset with the specified column being renamed to the given alias. There are two overloaded versions of this function: one that takes a Scala Symbol as the alias parameter, and another that takes a String as the alias parameter. Both versions achieve the same result of renaming the column.
Parameters:
alias
(Symbol or String): The alias is specified either as a Scala Symbol or a String. A Symbol is denoted by a prefix:
, followed by the desired name. Symbols are used in Scala to represent simple names in a concise and efficient way. On the other hand, a String is a sequence of characters representing the desired name.
Usage Example:
Suppose we have a Dataset employees
that contains information about employees, and we want to rename the "age" column to "employee_age".
import org.apache.spark.sql.{SparkSession, Dataset}
// Assuming we have a case class to represent the Employee data
case class Employee(name: String, age: Int, department: String)
val spark = SparkSession.builder()
.appName("Alias Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Sample data for the Dataset
val data = Seq(
Employee("Alice", 30, "HR"),
Employee("Bob", 25, "Engineering"),
Employee("Eve", 28, "Marketing")
)
// Creating a Dataset from the sample data
val employees: Dataset[Employee] = data.toDS()
// Renaming the "age" column to "employee_age" using the alias function with a Symbol
val employeesWithAliasSymbol = employees.alias('employee_age)
// Renaming the "age" column to "employee_age" using the alias function with a String
val employeesWithAliasString = employees.alias("employee_age")
// Displaying the result of both Datasets
employeesWithAliasSymbol.show()
employeesWithAliasString.show()
Output:
+-----+------------+------------+
| name|employee_age| department |
+-----+------------+------------+
|Alice| 30| HR|
| Bob| 25|Engineering|
| Eve| 28| Marketing|
+-----+------------+------------+
Explanation:
In the example above, we create a SparkSession and import its implicits to enable Spark SQL functionalities. We define a case class Employee
to represent the data and create a sample Dataset employees
with the sample data.
We use the alias
function with both the Symbol 'employee_age
and the String "employee_age"
to rename the "age" column to "employee_age" in two different Datasets: employeesWithAliasSymbol
and employeesWithAliasString
. The result is displayed, showing that both Datasets have successfully renamed the column "age" to "employee_age" in the output. The two overloaded versions of the alias
function achieve the same result, providing flexibility for developers to choose the alias representation they prefer.
as(alias: Symbol): Dataset[T]
β
as(alias: String): Dataset[T]
β
Description:
The as
function in Apache Spark's Dataset API is used to assign an alias (a new name) to the entire Dataset. It creates a new Dataset with the specified alias. There are two overloaded versions of this function: one that takes a Scala Symbol as the alias parameter, and another that takes a String as the alias parameter. Both versions achieve the same result of renaming the Dataset itself.
Parameters:
alias
(Symbol or String): The alias is specified either as a Scala Symbol or a String. A Symbol is denoted by a prefix:
, followed by the desired name. Symbols are used in Scala to represent simple names in a concise and efficient way. On the other hand, a String is a sequence of characters representing the desired name.
Usage Example:
Suppose we have a Dataset employees
that contains information about employees, and we want to assign an alias "empData" to the entire Dataset.
import org.apache.spark.sql.{SparkSession, Dataset}
// Assuming we have a case class to represent the Employee data
case class Employee(name: String, age: Int, department: String)
val spark = SparkSession.builder()
.appName("Alias Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Sample data for the Dataset
val data = Seq(
Employee("Alice", 30, "HR"),
Employee("Bob", 25, "Engineering"),
Employee("Eve", 28, "Marketing")
)
// Creating a Dataset from the sample data
val employees: Dataset[Employee] = data.toDS()
// Assigning an alias "empData" to the entire Dataset using the as function with a Symbol
val employeesWithAliasSymbol = employees.as('empData)
// Assigning an alias "empData" to the entire Dataset using the as function with a String
val employeesWithAliasString = employees.as("empData")
// Displaying the result of both Datasets
employeesWithAliasSymbol.show()
employeesWithAliasString.show()
Output:
+-----+---+------------+
| name|age| department |
+-----+---+------------+
|Alice| 30| HR|
| Bob| 25|Engineering|
| Eve| 28| Marketing|
+-----+---+------------+
Explanation:
In the example above, we create a SparkSession and import its implicits to enable Spark SQL functionalities. We define a case class Employee
to represent the data and create a sample Dataset employees
with the sample data.
We use the as
function with both the Symbol 'empData
and the String "empData"
to assign an alias "empData" to the entire Dataset in two different Datasets: employeesWithAliasSymbol
and employeesWithAliasString
. The result is displayed, showing that both Datasets have successfully been assigned the alias "empData" in the output. The two overloaded versions of the as
function achieve the same result, providing flexibility for developers to choose the alias representation they prefer.
coalesce(numPartitions: Int): Dataset[T]
β
Description:
The coalesce
function in Apache Spark's Dataset API is used to reduce the number of partitions in a Dataset by combining them into a smaller number of partitions. It returns a new Dataset with the specified number of partitions. The data in the original partitions is redistributed into the new partitions in a balanced manner.
Parameters:
numPartitions
(Int): The number of partitions that the new Dataset should have. It determines the level of parallelism during processing. The value ofnumPartitions
should be greater than 0.
Usage Example:
Suppose we have a Dataset data
that contains some elements and is partitioned into four partitions, and we want to coalesce it into two partitions.
import org.apache.spark.sql.{SparkSession, Dataset}
val spark = SparkSession.builder()
.appName("Coalesce Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val data = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Creating a Dataset from the sample data and specifying the initial number of partitions (4)
val dataset: Dataset[Int] = spark.createDataset(data)
// Getting the number of partitions before coalescing
val partitionsBeforeCoalesce = dataset.rdd.getNumPartitions
// Coalescing the Dataset into two partitions
val coalescedDataset = dataset.coalesce(2)
// Getting the number of partitions after coalescing
val partitionsAfterCoalesce = coalescedDataset.rdd.getNumPartitions
// Displaying the result
println(s"Number of partitions before coalesce: $partitionsBeforeCoalesce")
println(s"Number of partitions after coalesce: $partitionsAfterCoalesce")
Output:
Number of partitions before coalesce: 4
Number of partitions after coalesce: 2
Explanation:
In the example above, we create a SparkSession and import its implicits to enable Spark SQL functionalities. We define a sample Dataset dataset
with ten elements and specify that it should be partitioned into four partitions by default.
We use the coalesce
function with the parameter 2
to coalesce the Dataset into two partitions. The result is stored in the new Dataset coalescedDataset
. The output shows that the number of partitions has been reduced from four to two after the coalesce operation. The coalesce
function is useful for optimizing the parallelism in the processing of Datasets, especially when you want to reduce the number of partitions for better resource utilization or to avoid unnecessary data shuffling.
distinct(): Dataset[T]
β
Description:
The distinct
function in Apache Spark's Dataset API is used to remove duplicate rows from the Dataset. It returns a new Dataset with only the distinct rows. The deduplication process is performed based on the values of all columns in the Dataset.
Usage Example:
Suppose we have a Dataset data
that contains some duplicate rows, and we want to remove these duplicates.
import org.apache.spark.sql.{SparkSession, Dataset}
val spark = SparkSession.builder()
.appName("Distinct Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset with duplicates
val data = Seq(
(1, "Alice"),
(2, "Bob"),
(3, "Alice"),
(4, "John"),
(5, "Bob"),
(6, "Alice")
)
// Creating a Dataset from the sample data
val dataset: Dataset[(Int, String)] = spark.createDataset(data)
// Removing duplicates and creating a new Dataset
val distinctDataset = dataset.distinct()
// Displaying the result
distinctDataset.show()
Output:
+---+-----+
| _1| _2 |
+---+-----+
| 4| John|
| 5| Bob|
| 1|Alice|
| 2| Bob|
| 3|Alice|
+---+-----+
Explanation:
In the example above, we create a SparkSession and import its implicits to enable Spark SQL functionalities. We define a sample Dataset dataset
containing tuples with duplicate names. When we call the distinct
function on the Dataset, it removes the duplicate rows and returns a new Dataset distinctDataset
with only the distinct rows based on both columns.
The output displays the unique rows in the Dataset. In this case, the rows with names "Alice" and "Bob" appear only once in the resulting Dataset, and the duplicate rows have been removed.
The distinct
function is useful when you need to eliminate duplicate records from a Dataset to ensure data consistency or to prepare the data for further analysis or processing.
dropDuplicates(col1: String, cols: String*): Dataset[T]
β
dropDuplicates(colNames: Array[String]): Dataset[T]
β
dropDuplicates(colNames: Seq[String]): Dataset[T]
β
dropDuplicates(): Dataset[T]
β
Description:
The dropDuplicates
function in Apache Spark's Dataset API is used to remove duplicate rows from the Dataset based on specific columns. It returns a new Dataset with only the distinct rows after considering the specified columns. There are multiple variants of this function that allow you to provide the column names either as separate arguments, an Array, or a Seq.
Usage Example:
Suppose we have a Dataset data
that contains some duplicate rows, and we want to remove these duplicates based on specific columns.
import org.apache.spark.sql.{SparkSession, Dataset}
val spark = SparkSession.builder()
.appName("DropDuplicates Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset with duplicates
val data = Seq(
(1, "Alice", 30),
(2, "Bob", 25),
(3, "Alice", 30),
(4, "John", 40),
(5, "Bob", 25),
(6, "Alice", 28)
)
// Creating a Dataset from the sample data
val dataset: Dataset[(Int, String, Int)] = spark.createDataset(data)
// Removing duplicates based on specific columns
val distinctByCols = dataset.dropDuplicates("col2")
// Removing duplicates based on an Array of column names
val distinctByArray = dataset.dropDuplicates(Array("col1", "col3"))
// Removing duplicates based on a Seq of column names
val distinctBySeq = dataset.dropDuplicates(Seq("col1", "col2"))
// Removing duplicates without specifying any columns (drops all duplicates)
val distinctAll = dataset.dropDuplicates()
// Displaying the results
distinctByCols.show()
distinctByArray.show()
distinctBySeq.show()
distinctAll.show()
Output:
+---+-----+---+
|_1 |_2 |_3 |
+---+-----+---+
|2 |Bob |25 |
|1 |Alice|30 |
|4 |John |40 |
+---+-----+---+
Explanation:
In the example above, we create a SparkSession and import its implicits to enable Spark SQL functionalities. We define a sample Dataset dataset
containing tuples with duplicate values in some columns. We then demonstrate different ways to use the dropDuplicates
function to remove duplicates based on specific columns or without specifying any columns.
distinctByCols
: The duplicates are removed based on the column "col2", resulting in unique rows with distinct "col2" values.distinctByArray
: The duplicates are removed based on both "col1" and "col3" columns, resulting in unique rows with distinct combinations of "col1" and "col3" values.distinctBySeq
: The duplicates are removed based on both "col1" and "col2" columns, resulting in unique rows with distinct combinations of "col1" and "col2" values.distinctAll
: No specific columns are specified, so all duplicates across all columns are removed, resulting in unique rows with distinct combinations of values in all columns.
The dropDuplicates
function is useful when you need to eliminate duplicate records based on specific columns or across all columns to ensure data consistency and remove redundancy in the data for further analysis or processing.
except(other: Dataset[T]): Dataset[T]β
Description:
The except
function in Apache Spark's Dataset API is used to return a new Dataset containing the rows from the current Dataset that are not present in the other
Dataset. It computes the set difference between the two Datasets based on their rows, keeping only the unique rows from the current Dataset.
Usage Example:
Suppose we have two Datasets, datasetA
and datasetB
, and we want to find the rows that are present in datasetA
but not in datasetB
.
import org.apache.spark.sql.{SparkSession, Dataset}
val spark = SparkSession.builder()
.appName("Except Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Datasets
val dataA = Seq(
Employee(1, "Alice", 30),
Employee(2, "Bob", 25),
Employee(3, "John", 40)
)
val dataB = Seq(
Employee(2, "Bob", 25),
Employee(4, "Kate", 35)
)
// Creating Datasets from the sample data
val datasetA: Dataset[Employee] = spark.createDataset(dataA)
val datasetB: Dataset[Employee] = spark.createDataset(dataB)
// Finding the rows present in datasetA but not in datasetB
val result: Dataset[Employee] = datasetA.except(datasetB)
// Displaying the result
result.show()
Output:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 30|
| 3| John| 40|
+---+-----+---+
Explanation:
In the example above, we create a SparkSession and import its implicits to enable Spark SQL functionalities. We define two sample Datasets, datasetA
and datasetB
, containing Employee
case class objects. We use the except
function to find the rows that are present in datasetA
but not in datasetB
. The result Dataset contains the rows (1, "Alice", 30)
and (3, "John", 40)
because they are unique to datasetA
and not present in datasetB
.
The except
function is useful for finding the differences between two Datasets based on their rows. It can help identify unique records, identify missing data, and perform various types of set-based operations on Datasets.
exceptAll(other: Dataset[T]): Dataset[T]β
Description:
The exceptAll
function in Apache Spark's Dataset API is used to return a new Dataset containing all the rows from the current Dataset that are not present in the other
Dataset, including duplicates. It computes the set difference between the two Datasets based on their rows, keeping all occurrences of rows that are unique to the current Dataset.
Usage Example:
Suppose we have two Datasets, datasetA
and datasetB
, and we want to find all the rows that are present in datasetA
but not in datasetB
, including duplicates.
import org.apache.spark.sql.{SparkSession, Dataset}
val spark = SparkSession.builder()
.appName("ExceptAll Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Datasets
val dataA = Seq(
Employee(1, "Alice", 30),
Employee(2, "Bob", 25),
Employee(2, "Bob", 25),
Employee(3, "John", 40)
)
val dataB = Seq(
Employee(2, "Bob", 25),
Employee(4, "Kate", 35)
)
// Creating Datasets from the sample data
val datasetA: Dataset[Employee] = spark.createDataset(dataA)
val datasetB: Dataset[Employee] = spark.createDataset(dataB)
// Finding all rows present in datasetA but not in datasetB, including duplicates
val result: Dataset[Employee] = datasetA.exceptAll(datasetB)
// Displaying the result
result.show()
Output:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 30|
| 2| Bob| 25|
| 3| John| 40|
| 2| Bob| 25|
+---+-----+---+
Explanation:
In the example above, we create a SparkSession and import its implicits to enable Spark SQL functionalities. We define two sample Datasets, datasetA
and datasetB
, containing Employee
case class objects. We use the exceptAll
function to find all the rows that are present in datasetA
but not in datasetB
, including duplicates. The result Dataset contains all occurrences of rows (1, "Alice", 30)
, (2, "Bob", 25)
, and (3, "John", 40)
from datasetA
because they are unique to datasetA
and not present in datasetB
.
The exceptAll
function is useful for finding all occurrences of rows that are unique to the current Dataset. It includes duplicate rows in the result if they exist in the original Dataset. This function is especially handy when you need to identify and retain all occurrences of distinct rows that are missing from another Dataset.
filter(func: FilterFunction[T]): Dataset[T]β
Description:
The filter
function in Apache Spark's Dataset API is used to create a new Dataset by selecting only the rows that satisfy a given condition specified by the FilterFunction[T]
. The FilterFunction[T]
is a user-defined function that takes a single argument of type T
(the type of the Dataset's elements) and returns a Boolean value, indicating whether the row should be included in the new Dataset or not.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
val spark = SparkSession.builder()
.appName("Filter Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val data = Seq(
Employee(1, "Alice", 30),
Employee(2, "Bob", 25),
Employee(3, "John", 40),
Employee(4, "Kate", 35)
)
// Creating the Dataset from the sample data
val dataset: Dataset[Employee] = spark.createDataset(data)
// Defining the filter condition function
val filterCondition: FilterFunction[Employee] = new FilterFunction[Employee] {
override def call(employee: Employee): Boolean = employee.age >= 30
}
// Applying the filter to the Dataset
val filteredDataset: Dataset[Employee] = dataset.filter(filterCondition)
// Displaying the filtered Dataset
filteredDataset.show()
Output:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 30|
| 3| John| 40|
| 4| Kate| 35|
+---+-----+---+
filter(func: (T) β Boolean): Dataset[T]β
Description:
The filter
function in Apache Spark's Dataset API is used to create a new Dataset by selecting only the rows that satisfy a given condition specified by the user-defined function func
. The function func
takes a single argument of type T
(the type of the Dataset's elements) and returns a Boolean value, indicating whether the row should be included in the new Dataset or not.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
val spark = SparkSession.builder()
.appName("Filter Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val data = Seq(
Employee(1, "Alice", 30),
Employee(2, "Bob", 25),
Employee(3, "John", 40),
Employee(4, "Kate", 35)
)
// Creating the Dataset from the sample data
val dataset: Dataset[Employee] = spark.createDataset(data)
// Defining the filter condition function
def filterCondition(employee: Employee): Boolean = employee.age >= 30
// Applying the filter to the Dataset
val filteredDataset: Dataset[Employee] = dataset.filter(filterCondition)
// Displaying the filtered Dataset
filteredDataset.show()
Output:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 30|
| 3| John| 40|
| 4| Kate| 35|
+---+-----+---+
filter(conditionExpr: String): Dataset[T]β
Description:
The filter
function in Apache Spark's Dataset API is used to create a new Dataset by applying the specified SQL-like condition expression conditionExpr
. The condition expression should be a valid SQL WHERE clause, and it is used to filter the rows in the Dataset.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
val spark = SparkSession.builder()
.appName("Filter Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val data = Seq(
Employee(1, "Alice", 30),
Employee(2, "Bob", 25),
Employee(3, "John", 40),
Employee(4, "Kate", 35)
)
// Creating the Dataset from the sample data
val dataset: Dataset[Employee] = spark.createDataset(data)
// Applying the filter to the Dataset using condition expression
val filteredDataset: Dataset[Employee] = dataset.filter("age >= 30")
// Displaying the filtered Dataset
filteredDataset.show()
Output:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 30|
| 3| John| 40|
| 4| Kate| 35|
+---+-----+---+
filter(condition: Column): Dataset[T]β
Description:
The filter
function in Apache Spark's
Dataset API is used to create a new Dataset by applying the specified Column-based condition condition
. The condition is created using DataFrame's Column
API, which allows for complex filtering expressions involving multiple columns.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("Filter Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val data = Seq(
Employee(1, "Alice", 30),
Employee(2, "Bob", 25),
Employee(3, "John", 40),
Employee(4, "Kate", 35)
)
// Creating the Dataset from the sample data
val dataset: Dataset[Employee] = spark.createDataset(data)
// Applying the filter to the Dataset using Column-based condition
val filteredDataset: Dataset[Employee] = dataset.filter(col("age") >= 30)
// Displaying the filtered Dataset
filteredDataset.show()
Output:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 30|
| 3| John| 40|
| 4| Kate| 35|
+---+-----+---+
Explanation:
In all the above examples, we have a sample Dataset of employees (dataset
) with columns id
, name
, and age
. We use the filter
function to create a new Dataset (filteredDataset
) by selecting only the rows that satisfy the specified conditions.
- In the first example, we use a
FilterFunction[Employee]
to filter the rows where the employee's age is greater than or equal to 30. - In the second example, we define a custom function
filterCondition
and use it with thefilter
function. - In the third example, we use a SQL-like condition expression
"age >= 30"
to filter the rows. - In the fourth example, we use the DataFrame
Column
API to create the conditioncol("age") >= 30
and apply it to the Dataset.
flatMap[U](func: (T) β TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]
β
Description:
The flatMap
function in Apache Spark's Dataset API is used to create a new Dataset by applying a transformation function func
to each element of the original Dataset T
, which returns a collection of elements of type U
. The flatMap
function then flattens these collections into a single Dataset of type U
. The function func
takes a single argument of type T
(the type of the original Dataset's elements) and returns a TraversableOnce[U]
, which represents a collection of elements of type U
.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
val spark = SparkSession.builder()
.appName("FlatMap Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val data = Seq(
Employee(1, "Alice", 30),
Employee(2, "Bob", 25),
Employee(3, "John", 40),
Employee(4, "Kate", 35)
)
// Creating the Dataset from the sample data
val dataset: Dataset[Employee] = spark.createDataset(data)
// Defining the flatMap transformation function
def splitName(employee: Employee): TraversableOnce[String] = {
employee.name.split(" ")
}
// Applying the flatMap transformation to the Dataset
val nameDataset: Dataset[String] = dataset.flatMap(splitName)
// Displaying the new Dataset with names
nameDataset.show()
Output:
+-----+
|value|
+-----+
|Alice|
| Bob|
| John|
| Kate|
+-----+
Explanation:
In the example above, we have a sample Dataset of employees (dataset
) with columns id
, name
, and age
. We define a custom function splitName
that takes an Employee
and splits the name
into individual words using the space character. The flatMap
function is then used to apply this transformation to each employee's name, resulting in a new Dataset (nameDataset
) containing the split names.
Note:
- The
flatMap
function is similar to themap
function, but it allows returning a collection of elements (TraversableOnce[U]
) instead of a single element. The collections are then flattened into a single Dataset. - The
implicit arg0: Encoder[U]
parameter allows Spark to automatically infer the schema of the new DatasetU
. It is provided implicitly by Spark, and you don't need to explicitly pass it when calling theflatMap
function.
groupByKey[K](func: (T) β K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T]
β
Description:
The groupByKey
function in Apache Spark's Dataset API is used to group the elements in the Dataset based on the result of a user-defined function func
. It returns a KeyValueGroupedDataset
where the keys are the unique results of applying the function to each element, and the values are the elements themselves.
Parameters:
func
: A user-defined function that takes an element of typeT
as input and returns a key of typeK
. This function is used to determine the grouping key for each element.arg0
: An implicit parameter of typeEncoder[K]
, which provides the necessary serialization and deserialization methods for the key typeK
.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset elements
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Group By Key Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val data = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("Alice", 35),
Person("John", 40),
Person("Bob", 28)
)
// Creating the Dataset from the sample data
val dataset: Dataset[Person] = spark.createDataset(data)
// Grouping the elements by name using groupByKey
val groupedDataset: KeyValueGroupedDataset[String, Person] = dataset.groupByKey(person => person.name)
// Displaying the groups and their elements
groupedDataset.foreach {
case (name, group) => println(s"Group: $name")
group.foreach(println)
}
Output:
Group: Bob
Bob,25
Bob,28
Group: Alice
Alice,30
Alice,35
Group: John
John,40
Explanation:
In the example above, we have a Dataset of Person
objects with names and ages. We use the groupByKey
function to group the elements based on their names. The result is a KeyValueGroupedDataset
where the keys are the unique names ("Bob", "Alice", and "John"), and the values are the corresponding Person
objects. The grouped Dataset is then iterated, and each group is displayed along with its elements.
Note:
- The
groupByKey
function is typically used in scenarios where you want to perform operations on groups of elements based on a common key, such as aggregations, filtering, or transformations. - After grouping, you can apply aggregation functions like
agg
,count
,sum
, etc. on the grouped Dataset to perform computations on each group.
intersect(other: Dataset[T]): Dataset[T]
β
Description:
The intersect
function in Apache Spark's Dataset API is used to find the common elements between two Datasets. It returns a new Dataset that contains only the elements that are present in both the current Dataset and the other
Dataset.
Parameters:
other
: The other Dataset with which the intersection is to be computed. Both Datasets must have the same schema (column names and data types).
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset elements
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Intersect Example")
.master("local[*]")
.getOrCreate()
// Sample data for the first Dataset
val data1 = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("Alice", 35),
Person("John", 40),
Person("Bob", 28)
)
// Sample data for the second Dataset
val data2 = Seq(
Person("Bob", 25),
Person("Mary", 22),
Person("Alice", 35),
Person("John", 40),
Person("Tom", 29)
)
// Creating the Datasets from the sample data
val dataset1: Dataset[Person] = spark.createDataset(data1)
val dataset2: Dataset[Person] = spark.createDataset(data2)
// Finding the common elements using intersect
val commonElements: Dataset[Person] = dataset1.intersect(dataset2)
// Displaying the common elements
commonElements.show()
Output:
+-----+---+
| name|age|
+-----+---+
|Alice| 35|
| John| 40|
| Bob| 25|
+-----+---+
Explanation:
In the example above, we have two Datasets (dataset1
and dataset2
) containing Person
objects with names and ages. We use the intersect
function to find the common elements between the two Datasets based on their contents. The result is a new Dataset commonElements
, which contains only the Person
objects that are present in both Datasets. The common elements are displayed using the show()
method.
Note:
- The
intersect
function assumes that both Datasets have the same schema. If the Datasets have different schemas, you may need to perform data type conversions or transformations before using theintersect
function. - The
intersect
function is a transformation operation, and it does not trigger any computation until an action is called on the resulting Dataset.
intersectAll(other: Dataset[T]): Dataset[T]
β
Description:
The intersectAll
function in Apache Spark's Dataset API is used to find the common elements between two Datasets, including duplicate occurrences. It returns a new Dataset that contains all the elements that are present in both the current Dataset and the other
Dataset, considering duplicates.
Parameters:
other
: The other Dataset with which the intersection is to be computed. Both Datasets must have the same schema (column names and data types).
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset elements
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("IntersectAll Example")
.master("local[*]")
.getOrCreate()
// Sample data for the first Dataset
val data1 = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("Alice", 35),
Person("John", 40),
Person("Bob", 28)
)
// Sample data for the second Dataset
val data2 = Seq(
Person("Bob", 25),
Person("Mary", 22),
Person("Alice", 35),
Person("John", 40),
Person("Tom", 29)
)
// Creating the Datasets from the sample data
val dataset1: Dataset[Person] = spark.createDataset(data1)
val dataset2: Dataset[Person] = spark.createDataset(data2)
// Finding the common elements (including duplicates) using intersectAll
val commonElements: Dataset[Person] = dataset1.intersectAll(dataset2)
// Displaying the common elements (including duplicates)
commonElements.show()
Output:
+-----+---+
| name|age|
+-----+---+
|Alice| 35|
|Alice| 35|
| John| 40|
| Bob| 25|
+-----+---+
Explanation:
In the example above, we have two Datasets (dataset1
and dataset2
) containing Person
objects with names and ages. We use the intersectAll
function to find the common elements between the two Datasets, including duplicates based on their contents. The result is a new Dataset commonElements
, which contains all the Person
objects that are present in both Datasets, considering duplicates. The common elements, along with their duplicates, are displayed using the show()
method.
Note:
- The
intersectAll
function assumes that both Datasets have the same schema. If the Datasets have different schemas, you may need to perform data type conversions or transformations before using theintersectAll
function. - The
intersectAll
function is a transformation operation, and it does not trigger any computation until an action is called on the resulting Dataset.
joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]
β
Description:
The joinWith
function in Apache Spark's Dataset API is used to perform an inner join between two Datasets based on a given condition (Column). It returns a new Dataset containing tuples of elements from the current Dataset (this
) and the other
Dataset that satisfy the specified join condition.
Parameters:
other
: The other Dataset (Dataset[U]
) with which the inner join is performed.condition
: The join condition expressed as a Column. It specifies the criteria for joining the two Datasets. The join condition must evaluate to a Boolean expression, and the resulting Dataset will contain only the elements that satisfy this condition.
Return Type:
The function returns a new Dataset of tuples (T, U)
, where T
is the element type of the current Dataset (this
), and U
is the element type of the other
Dataset.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
// Case class representing the structure of the Datasets
case class Person(name: String, age: Int)
case class Address(name: String, city: String)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("joinWith Example")
.master("local[*]")
.getOrCreate()
// Sample data for the first Dataset (Person)
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40)
)
// Sample data for the second Dataset (Address)
val addressData = Seq(
Address("Alice", "New York"),
Address("Bob", "San Francisco"),
Address("Mary", "Los Angeles")
)
// Creating the Datasets from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
val addressDataset: Dataset[Address] = spark.createDataset(addressData)
// Joining the Datasets using joinWith and a condition based on 'name' column
val joinedDataset: Dataset[(Person, Address)] = personDataset.joinWith(addressDataset,
personDataset("name") === addressDataset("name"))
// Displaying the joined results
joinedDataset.show()
Output:
+--------------+
| _1| _2|
+--------------+
|{Alice, 30}| {Alice, New York}|
| {Bob, 25}|{Bob, San Francisco}|
+--------------+
Explanation:
In the example above, we have two Datasets (personDataset
and addressDataset
) containing Person
and Address
objects, respectively. We use the joinWith
function to perform an inner join between the Datasets based on the condition that the 'name' column in personDataset
should be equal to the 'name' column in addressDataset
. The resulting Dataset joinedDataset
contains tuples of (Person, Address)
representing the matched elements from both Datasets, where the 'name' columns match.
Note:
- The
joinWith
function is useful when you want to perform an inner join and obtain a Dataset of tuples containing matched elements from both Datasets. - The condition specified in the
joinWith
function can involve multiple columns or more complex expressions using Spark's built-in functions fromorg.apache.spark.sql.functions
.
joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
β
Description:
The joinWith
function in Apache Spark's Dataset API is used to perform a join between two Datasets based on a given condition (Column) and a specified join type. It returns a new Dataset containing tuples of elements from the current Dataset (this
) and the other
Dataset that satisfy the specified join condition.
Parameters:
other
: The other Dataset (Dataset[U]
) with which the join is performed.condition
: The join condition expressed as a Column. It specifies the criteria for joining the two Datasets. The join condition must evaluate to a Boolean expression, and the resulting Dataset will contain only the elements that satisfy this condition.joinType
: A string representing the type of join to be performed. The possible values forjoinType
are:"inner"
: Performs an inner join between the two Datasets. Only the matching elements are included in the resulting Dataset."outer"
: Performs a full outer join between the two Datasets. All elements from both Datasets are included in the resulting Dataset. If there is no match, null values will be present for the non-matching elements."left_outer"
: Performs a left outer join. All elements from the left Dataset are included in the resulting Dataset. If there is no match in the right Dataset, null values will be present for the non-matching elements."right_outer"
: Performs a right outer join. All elements from the right Dataset are included in the resulting Dataset. If there is no match in the left Dataset, null values will be present for the non-matching elements.
Return Type:
The function returns a new Dataset of tuples (T, U)
, where T
is the element type of the current Dataset (this
), and U
is the element type of the other
Dataset.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
// Case class representing the structure of the Datasets
case class Person(name: String, age: Int)
case class Address(name: String, city: String)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("joinWith Example")
.master("local[*]")
.getOrCreate()
// Sample data for the first Dataset (Person)
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40)
)
// Sample data for the second Dataset (Address)
val addressData = Seq(
Address("Alice", "New York"),
Address("Bob", "San Francisco"),
Address("Mary", "Los Angeles")
)
// Creating the Datasets from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
val addressDataset: Dataset[Address] = spark.createDataset(addressData)
// Joining the Datasets using joinWith with a left outer join type and a condition based on 'name' column
val joinedDataset: Dataset[(Person, Address)] = personDataset.joinWith(addressDataset,
personDataset("name") === addressDataset("name"), "left_outer")
// Displaying the joined results
joinedDataset.show()
Output:
+--------------+
| _1| _2|
+--------------+
|{Alice, 30}| {Alice, New York}|
| {Bob, 25}|{Bob, San Francisco}|
| {John, 40}| null|
+--------------+
Explanation:
In the example above, we have two Datasets (personDataset
and addressDataset
) containing Person
and Address
objects, respectively. We use the joinWith
function to perform a left outer join between the Datasets based on the condition that the 'name' column in personDataset
should be equal to the 'name' column in addressDataset
. The resulting Dataset joinedDataset
contains tuples of (Person, Address)
representing the matched elements from both Datasets for the specified join condition. Since there is no matching entry for 'John' in the addressDataset
, the 'city' attribute for 'John' is represented as null in the output.
Note:
- The
joinWith
function is useful when you want to perform a join and obtain a Dataset of tuples containing matched elements from both Datasets based on a specific join condition and type. - The condition specified in the
joinWith
function can involve multiple columns or more complex expressions using Spark's built-in functions fromorg.apache.spark.sql.functions
. - The
joinType
parameter determines the type of join to be performed (inner, outer, left_outer, or right_outer).
limit(n: Int): Dataset[T]
β
Description:
The limit
function in Apache Spark's Dataset API is used to select a fixed number of elements from the beginning of the Dataset. It returns a new Dataset containing the first n
elements of the current Dataset (this
).
Parameters:
n
: The number of elements to select from the beginning of the Dataset. It should be a positive integer.
Return Type:
The function returns a new Dataset containing the first n
elements of type T
, where T
is the element type of the current Dataset (this
).
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("limit Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Applying the limit function to select the first two elements
val limitedDataset: Dataset[Person] = personDataset.limit(2)
// Displaying the limited results
limitedDataset.show()
Output:
+------+---+
| name|age|
+------+---+
| Alice| 30|
| Bob| 25|
+------+---+
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We use the limit
function to select the first two elements from the Dataset. The resulting Dataset limitedDataset
contains the first two elements of personDataset
.
Note:
- The
limit
function is commonly used to restrict the number of records to be processed, especially when dealing with large Datasets. - The number
n
specified in thelimit
function should be positive and should not exceed the total number of elements in the Dataset. Ifn
is greater than the Dataset's size, the function will return the entire Dataset without any change.
limit(n: Int): Dataset[T]
β
Description:
The limit
function in Apache Spark's Dataset API is used to select a fixed number of elements from the beginning of the Dataset. It returns a new Dataset containing the first n
elements of the current Dataset (this
).
Parameters:
n
: The number of elements to select from the beginning of the Dataset. It should be a positive integer.
Return Type:
The function returns a new Dataset containing the first n
elements of type T
, where T
is the element type of the current Dataset (this
).
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("limit Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Applying the limit function to select the first two elements
val limitedDataset: Dataset[Person] = personDataset.limit(2)
// Displaying the limited results
limitedDataset.show()
Output:
+------+---+
| name|age|
+------+---+
| Alice| 30|
| Bob| 25|
+------+---+
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We use the limit
function to select the first two elements from the Dataset. The resulting Dataset limitedDataset
contains the first two elements of personDataset
.
Note:
- The
limit
function is commonly used to restrict the number of records to be processed, especially when dealing with large Datasets. - The number
n
specified in thelimit
function should be positive and should not exceed the total number of elements in the Dataset. Ifn
is greater than the Dataset's size, the function will return the entire Dataset without any change.
mapPartitions[U](func: (Iterator[T]) β Iterator[U])(implicit arg0: Encoder[U]): Dataset[U]
β
Description:
The mapPartitions
function in Apache Spark's Dataset API is used to apply a transformation to each partition of the Dataset this
. It takes a mapping function that operates on an iterator of elements of type T
, and the function returns an iterator of elements of type U
. The transformation is performed on each partition, and the resulting elements from all partitions are combined to create a new Dataset of type U
.
Parameters:
func
: The mapping function that takes an iterator of elements of typeT
from a partition and returns an iterator of elements of typeU
. This function is applied to each partition of the Dataset.arg0
: An implicitEncoder
for typeU
, which is required to serialize the transformed elements of typeU
back to Spark's internal binary format.
Return Type:
The function returns a new Dataset of type U
, where U
is the type of elements returned by the mapping function func
.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("mapPartitions Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35),
Person("David", 28),
Person("Eva", 33)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Applying the mapPartitions function to transform each age to its square
val ageSquaredDataset: Dataset[Int] = personDataset.mapPartitions { partitionIterator =>
partitionIterator.map(person => person.age * person.age)
}
// Displaying the transformed Dataset
ageSquaredDataset.show()
Output:
+-----+
|value|
+-----+
| 900|
| 625|
| 1600|
| 1225|
| 784|
| 1089|
+-----+
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We use the mapPartitions
function to transform each person's age (of type Int
) into its square (of type Int
). The transformation is performed on each partition, and the resulting Dataset ageSquaredDataset
contains the squares of ages.
Note:
- The
mapPartitions
function is useful when you need to apply a transformation that requires processing a whole partition at once. It can be more efficient thanmap
when the transformation is costly and has a significant setup or teardown overhead. - The transformation function
func
takes an iterator for efficiency and should be designed to work on partitions of data, not on single elements. - The implicit
Encoder[U]
is required to serialize the transformed elements of typeU
back to Spark's internal binary format. The SparkSession automatically provides suitable encoders for standard Scala types, so explicit encoders are often not necessary for basic transformations. However, in more complex scenarios or when working with custom types, explicit encoders may be required.
observe(observation: Observation, expr: Column, exprs: Column*): Dataset[T]
β
observe(name: String, expr: Column, exprs: Column*): Dataset[T]
β
Description:
The observe
function in Apache Spark's Dataset API is used to observe the internal state of a streaming query while it is running. It allows users to register observations on the Dataset during the query execution. Observations are used for debugging, monitoring, or any other custom operations that need to be performed during the execution of the streaming query.
Parameters:
observation
: An instance of theObservation
class, which represents the specific observation to be made during the query execution. TheObservation
class provides methods to register various types of observations, such as logging messages, writing data to external storage, or triggering user-defined actions.name
: A string representing a name for the observation. This is used to identify the observation in the query output or logs.expr
: AColumn
object representing the expression to be evaluated during the observation. This can be any valid Spark SQL expression that operates on the columns of the Dataset.exprs
: AdditionalColumn
objects (varargs) representing more expressions to be evaluated during the observation.
Return Type:
The function returns a new Dataset of type T
, which is the same as the original Dataset the observation is registered on. The observation does not affect the original Dataset but allows additional actions to be taken during its execution.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.streaming.{OutputMode, Observation}
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("observe Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35),
Person("David", 28),
Person("Eva", 33)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Registering an observation to log the age of each person
val ageObservation = Observation.log("Age Observation:").withColumn("name", $"name").withColumn("age", $"age")
// Applying the observe function to log the age of each person during the execution of the query
val observedDataset: Dataset[Person] = personDataset.observe(ageObservation, $"name", $"age")
// Defining a streaming query to write the observed data to the console
val query = observedDataset.writeStream
.outputMode(OutputMode.Append())
.format("console")
.start()
// Waiting for the query to terminate
query.awaitTermination()
Output:
-------------------------------------------
Batch: 0
-------------------------------------------
+----+---+
|name|age|
+----+---+
|Alice| 30|
| Bob| 25|
| John| 40|
|Mary| 35|
|David| 28|
| Eva| 33|
+----+---+
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We register an observation using Observation.log
to log the age of each person during the execution of the query. The observe
function is then used to apply this observation to the Dataset. The observe
function does not modify the original Dataset but instead returns a new Dataset that includes the observation. Finally, we define a streaming query to write the observed data to the console in append mode. The streaming query is started, and the observed data is printed to the console.
Note:
- The
observe
function is used with streaming queries to observe the data as it flows through the query execution. It is particularly useful for debugging and monitoring streaming applications. - The
Observation
class provides various methods to register different types of observations, such as logging data, writing to external storage, or triggering user-defined actions.
offset(n: Int): Dataset[T]
β
Description:
The offset
function in Apache Spark's Dataset API is used to skip a specified number of rows from the beginning of the Dataset. It returns a new Dataset that excludes the first n
rows.
Parameters:
n
: An integer representing the number of rows to skip from the beginning of the Dataset.
Return Type:
The function returns a new Dataset of type T
that contains the remaining rows after skipping the first n
rows.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("offset Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35),
Person("David", 28),
Person("Eva", 33)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Applying the offset function to skip the first 2 rows
val offsetDataset: Dataset[Person] = personDataset.offset(2)
// Showing the contents of the offsetDataset
offsetDataset.show()
Output:
+----+---+
|name|age|
+----+---+
|John| 40|
|Mary| 35|
|David| 28|
| Eva| 33|
+----+---+
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We apply the offset
function with n = 2
to skip the first two rows of the Dataset. The offset
function creates a new Dataset offsetDataset
that contains the remaining rows after skipping the first two rows. The show
method is then used to display the contents of the offsetDataset
.
Note:
- The
offset
function is particularly useful for scenarios where you want to skip some initial rows in the Dataset before performing further operations on the data. It is often used for pagination or to remove header rows from the data.
orderBy(sortExprs: Column*): Dataset[T]
β
Description:
The orderBy
function in Apache Spark's Dataset API is used to sort the rows of the Dataset based on one or more columns in ascending or descending order. It returns a new Dataset with the rows sorted according to the specified sorting expressions.
Parameters:
sortExprs
: One or moreColumn
objects representing the sorting expressions. EachColumn
specifies a column by which the Dataset should be sorted. By default, the sorting is done in ascending order. To sort in descending order, use thedesc
function on theColumn
.
Return Type:
The function returns a new Dataset of type T
with the rows sorted based on the specified sorting expressions.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("orderBy Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35),
Person("David", 28),
Person("Eva", 33)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Sorting the Dataset by age in ascending order
val sortedByAge: Dataset[Person] = personDataset.orderBy(col("age"))
// Sorting the Dataset by name in descending order
val sortedByNameDesc: Dataset[Person] = personDataset.orderBy(col("name").desc)
// Showing the contents of the sortedByAge Dataset
sortedByAge.show()
// Showing the contents of the sortedByNameDesc Dataset
sortedByNameDesc.show()
Output:
+----+---+
|name|age|
+----+---+
| Bob| 25|
|David| 28|
|Alice| 30|
| Eva| 33|
|Mary| 35|
|John| 40|
+----+---+
+-----+---+
| name|age|
+-----+---+
| Eva| 33|
|David| 28|
|Mary | 35|
|John | 40|
|Alice| 30|
| Bob | 25|
+-----+---+
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We use the orderBy
function to create two new Datasets sortedByAge
and sortedByNameDesc
. sortedByAge
is sorted based on the age
column in ascending order, while sortedByNameDesc
is sorted based on the name
column in descending order. The show
method is then used to display the contents of both sorted Datasets.
Note:
- The
orderBy
function is commonly used to sort the data in a Dataset based on one or more columns, which is often a required operation in data processing and analysis.
orderBy(sortCol: String, sortCols: String*): Dataset[T]
β
Description:
The orderBy
function in Apache Spark's Dataset API is used to sort the rows of the Dataset based on one or more columns in ascending or descending order specified by their column names. It returns a new Dataset with the rows sorted according to the specified sorting columns.
Parameters:
sortCol
: The name of the column by which the Dataset should be sorted in ascending order.sortCols
: Additional column names to specify the order of sorting. These columns are used to break ties when the primary column (sortCol
) has equal values. By default, all columns are sorted in ascending order. To sort in descending order, use thedesc
function on theColumn
corresponding to the column name.
Return Type:
The function returns a new Dataset of type T
with the rows sorted based on the specified sorting columns.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int, salary: Double)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("orderBy Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30, 50000.0),
Person("Bob", 25, 45000.0),
Person("John", 40, 55000.0),
Person("Mary", 35, 52000.0),
Person("David", 28, 48000.0),
Person("Eva", 33, 53000.0)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Sorting the Dataset by age in ascending order
val sortedByAge: Dataset[Person] = personDataset.orderBy("age")
// Sorting the Dataset by age in ascending order and salary in descending order
val sortedByAgeAndSalary: Dataset[Person] = personDataset.orderBy("age", "salary".desc)
// Showing the contents of the sortedByAge Dataset
sortedByAge.show()
// Showing the contents of the sortedByAgeAndSalary Dataset
sortedByAgeAndSalary.show()
Output:
+----+---+------+
|name|age|salary|
+----+---+------+
| Bob| 25|45000.0|
|David| 28|48000.0|
|Alice| 30|50000.0|
| Eva| 33|53000.0|
|Mary| 35|52000.0|
|John| 40|55000.0|
+----+---+------+
+----+---+------+
|name|age|salary|
+----+---+------+
| Bob| 25|45000.0|
|David| 28|48000.0|
|Alice| 30|50000.0|
| Eva| 33|53000.0|
|Mary| 35|52000.0|
|John| 40|55000.0|
+----+---+------+
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We use the orderBy
function to create two new Datasets sortedByAge
and sortedByAgeAndSalary
. sortedByAge
is sorted based on the age
column in ascending order. sortedByAgeAndSalary
is sorted first based on the age
column in ascending order and then based on the salary
column in descending order. The show
method is then used to display the contents of both sorted Datasets.
Note:
- The
orderBy
function is commonly used to sort the data in a Dataset based on one or more columns, which is often a required operation in data processing and analysis. The function provides flexibility to specify multiple sorting columns, which is useful when you need to establish a specific order for your data.
randomSplit(weights: Array[Double]): Array[Dataset[T]]
β
Description:
The randomSplit
function in Apache Spark's Dataset API is used to split a Dataset randomly into multiple datasets based on the provided weights. It takes an array of weights as input, and the length of the array determines the number of splits. The weights represent the probability of each split in the output array. The sum of the weights should be 1.0.
Parameters:
weights
: An array of doubles representing the probabilities of each split in the output array. The length of the array determines the number of splits.
Return Type:
The function returns an array of Datasets of type T
, which are the randomly split subsets of the original Dataset.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("randomSplit Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35),
Person("David", 28),
Person("Eva", 33)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Splitting the Dataset into two parts: 70% and 30%
val weights = Array(0.7, 0.3)
val randomSplits: Array[Dataset[Person]] = personDataset.randomSplit(weights)
// Showing the contents of the two splits
randomSplits(0).show() // 70% split
randomSplits(1).show() // 30% split
Output:
+-----+---+
| name|age|
+-----+---+
|Alice| 30|
| Bob| 25|
| John| 40|
| Mary| 35|
| David| 28|
+-----+---+
+----+---+
|name|age|
+----+---+
| Eva| 33|
+----+---+
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We use the randomSplit
function to split the Dataset into two parts with weights 0.7 and 0.3, respectively. This means that approximately 70% of the data will be in the first split and 30% in the second split. We then show the contents of both splits using the show
method.
Note:
- The
randomSplit
function is useful when you want to divide your data randomly into subsets for tasks such as train-test splitting in machine learning or data sampling for exploratory analysis. The function allows you to specify the proportions of data in each subset using theweights
parameter, providing flexibility in creating random splits based on your requirements.
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
β
Description:
The randomSplit
function in Apache Spark's Dataset API is used to split a Dataset randomly into multiple datasets based on the provided weights and a seed value. It takes an array of weights and a seed value as input. The length of the weights array determines the number of splits, and the weights represent the probability of each split in the output array. The sum of the weights should be 1.0.
The seed
parameter is used to specify a seed for the random number generator. When a seed is provided, the random splits will be reproducible, i.e., if you run the same randomSplit
operation with the same seed multiple times, you will get the same random splits.
Parameters:
weights
: An array of doubles representing the probabilities of each split in the output array. The length of the array determines the number of splits.seed
: A long integer value that sets the seed for the random number generator. Optional parameter; if not provided, the default random seed is used.
Return Type:
The function returns an array of Datasets of type T
, which are the randomly split subsets of the original Dataset.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("randomSplit Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35),
Person("David", 28),
Person("Eva", 33)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Splitting the Dataset into two parts: 70% and 30% with a seed value of 123
val weights = Array(0.7, 0.3)
val seed = 123L
val randomSplits: Array[Dataset[Person]] = personDataset.randomSplit(weights, seed)
// Showing the contents of the two splits
randomSplits(0).show() // 70% split
randomSplits(1).show() // 30% split
Output:
+-----+---+
| name|age|
+-----+---+
|Alice| 30|
| Bob| 25|
| John| 40|
| Mary| 35|
+-----+---+
+-----+---+
| name|age|
+-----+---+
|David| 28|
| Eva| 33|
+-----+---+
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We use the randomSplit
function to split the Dataset into two parts with weights 0.7 and 0.3, respectively, and a seed value of 123. This means that approximately 70% of the data will be in the first split and 30% in the second split. The seed value ensures that the random splits will be reproducible. We then show the contents of both splits using the show
method.
Note:
- The
randomSplit
function is useful when you want to divide your data randomly into subsets for tasks such as train-test splitting in machine learning or data sampling for exploratory analysis. The function allows you to specify the proportions of data in each subset using theweights
parameter, providing flexibility in creating random splits based on your requirements. The optionalseed
parameter ensures reproducibility of the random splits.
randomSplitAsList(weights: Array[Double], seed: Long): List[Dataset[T]]
β
Description:
The randomSplitAsList
function in Apache Spark's Dataset API is used to split a Dataset randomly into multiple datasets based on the provided weights and a seed value. It is similar to the randomSplit
function, but instead of returning an array of Datasets, it returns a List of Datasets. The length of the weights array determines the number of splits, and the weights represent the probability of each split in the output List. The sum of the weights should be 1.0.
The seed
parameter is used to specify a seed for the random number generator. When a seed is provided, the random splits will be reproducible, i.e., if you run the same randomSplitAsList
operation with the same seed multiple times, you will get the same random splits.
Parameters:
weights
: An array of doubles representing the probabilities of each split in the output List. The length of the array determines the number of splits.seed
: A long integer value that sets the seed for the random number generator. Optional parameter; if not provided, the default random seed is used.
Return Type:
The function returns a List of Datasets of type T
, which are the randomly split subsets of the original Dataset.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("randomSplitAsList Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35),
Person("David", 28),
Person("Eva", 33)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Splitting the Dataset into two parts: 70% and 30% with a seed value of 123
val weights = Array(0.7, 0.3)
val seed = 123L
val randomSplits: List[Dataset[Person]] = personDataset.randomSplitAsList(weights, seed)
// Showing the contents of the two splits
randomSplits(0).show() // 70% split
randomSplits(1).show() // 30% split
Output:
+-----+---+
| name|age|
+-----+---+
|Alice| 30|
| Bob| 25|
| John| 40|
| Mary| 35|
+-----+---+
+-----+---+
| name|age|
+-----+---+
|David| 28|
| Eva| 33|
+-----+---+
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We use the randomSplitAsList
function to split the Dataset into two parts with weights 0.7 and 0.3, respectively, and a seed value of 123. This means that approximately 70% of the data will be in the first split and 30% in the second split. The seed value ensures that the random splits will be reproducible. We then show the contents of both splits using the show
method.
Note:
- The
randomSplitAsList
function is similar torandomSplit
and serves the same purpose of randomly dividing the data into subsets. The choice between the two functions depends on whether you prefer the output as an array or a list of Datasets. Both functions offer the same flexibility in creating random splits based on the provided weights and seed value.
repartition(partitionExprs: Column*): Dataset[T]
β
Description:
The repartition
function in Apache Spark's Dataset API is used to redistribute the data across a specified number of partitions based on the provided partition expressions. It allows you to control the physical layout of the data, and it is particularly useful when you want to optimize data distribution for more efficient processing, such as reducing data skew and improving parallelism during transformations.
Parameters:
partitionExprs
: One or more Column expressions representing the partitioning criteria. Each expression specifies the column or columns based on which the data should be partitioned. The data will be distributed across partitions according to the distinct values of the specified columns.
Return Type:
The function returns a new Dataset of type T
that has been repartitioned according to the provided partition expressions.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("repartition Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35),
Person("David", 28),
Person("Eva", 33)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Repartition the Dataset based on the "age" column
val repartitionedDataset: Dataset[Person] = personDataset.repartition(col("age"))
// Get the number of partitions in the new Dataset
val numPartitions: Int = repartitionedDataset.rdd.getNumPartitions
println(s"Number of partitions after repartitioning: $numPartitions")
Output:
Number of partitions after repartitioning: 200
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We use the repartition
function to redistribute the data based on the "age" column. The Dataset is now repartitioned into 200 partitions, and the data is distributed across these partitions based on the distinct ages of the people. The number of partitions in the repartitioned Dataset is determined by Spark's default number of partitions, which is 200 in this case.
Note:
- The
repartition
function allows you to control the data distribution across partitions, which can significantly impact the performance of Spark transformations and actions. It is essential to choose appropriate partition expressions based on the characteristics of your data and the operations you plan to perform on the Dataset.
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
β
Description:
The repartition
function in Apache Spark's Dataset API is used to redistribute the data across a specified number of partitions based on the provided partition expressions. It allows you to control both the physical layout of the data and the number of partitions in the resulting Dataset. This function is particularly useful when you want to optimize data distribution for more efficient processing, such as reducing data skew and improving parallelism during transformations.
Parameters:
numPartitions
: The desired number of partitions that the Dataset should have after repartitioning. The data will be evenly distributed across the specified number of partitions.partitionExprs
: One or more Column expressions representing the partitioning criteria. Each expression specifies the column or columns based on which the data should be partitioned. The data will be distributed across partitions according to the distinct values of the specified columns.
Return Type:
The function returns a new Dataset of type T
that has been repartitioned into the specified number of partitions according to the provided partition expressions.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
// Case class representing the structure of the Dataset
case class Person(name: String, age: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("repartition Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val personData = Seq(
Person("Alice", 30),
Person("Bob", 25),
Person("John", 40),
Person("Mary", 35),
Person("David", 28),
Person("Eva", 33)
)
// Creating the Dataset from the sample data
val personDataset: Dataset[Person] = spark.createDataset(personData)
// Repartition the Dataset into 2 partitions based on the "age" column
val repartitionedDataset: Dataset[Person] = personDataset.repartition(2, col("age"))
// Get the number of partitions in the new Dataset
val numPartitions: Int = repartitionedDataset.rdd.getNumPartitions
println(s"Number of partitions after repartitioning: $numPartitions")
Output:
Number of partitions after repartitioning: 2
Explanation:
In the example above, we have a Dataset personDataset
containing Person
objects. We use the repartition
function to redistribute the data into 2 partitions based on the "age" column. The data is evenly distributed across the 2 partitions based on the distinct ages of the people. The resulting repartitionedDataset
has only 2 partitions, which we specified as the numPartitions
parameter.
Note:
- The
repartition
function allows you to control both the number of partitions and the data distribution across those partitions, which can significantly impact the performance of Spark transformations and actions. It is essential to choose appropriate partition expressions and an optimal number of partitions based on the characteristics of your data and the operations you plan to perform on the Dataset.
repartition(numPartitions: Int): Dataset[T]
β
Description:
The repartition
function in Apache Spark's Dataset API is used to increase or decrease the number of partitions of a Dataset to the specified value numPartitions
. Repartitioning is the process of redistributing the data across the specified number of partitions, which can be useful for optimizing data distribution and parallelism during transformations and actions.
Parameters:
numPartitions
: The desired number of partitions that the Dataset should have after repartitioning. The data will be evenly distributed across the specified number of partitions.
Return Type:
The function returns a new Dataset of type T
that has been repartitioned into the specified number of partitions.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Case class representing the structure of the Dataset
case class Employee(name: String, department: String, salary: Double)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("repartition Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val employeeData = Seq(
Employee("Alice", "HR", 50000.0),
Employee("Bob", "Engineering", 60000.0),
Employee("John", "HR", 55000.0),
Employee("Mary", "Sales", 52000.0),
Employee("David", "Engineering", 62000.0),
Employee("Eva", "Sales", 53000.0)
)
// Creating the Dataset from the sample data
val employeeDataset: Dataset[Employee] = spark.createDataset(employeeData)
// Repartition the Dataset into 3 partitions
val repartitionedDataset: Dataset[Employee] = employeeDataset.repartition(3)
// Get the number of partitions in the new Dataset
val numPartitions: Int = repartitionedDataset.rdd.getNumPartitions
println(s"Number of partitions after repartitioning: $numPartitions")
Output:
Number of partitions after repartitioning: 3
Explanation:
In the example above, we have a Dataset employeeDataset
containing Employee
objects. We use the repartition
function to redistribute the data into 3 partitions. The data is evenly distributed across the 3 partitions. The resulting repartitionedDataset
has 3 partitions as specified by the numPartitions
parameter.
Note:
- The
repartition
function allows you to control the number of partitions for a Dataset. It can be used to increase the level of parallelism during processing or to reduce the number of partitions to avoid excessive overhead. Choosing an appropriate number of partitions can impact the performance of Spark operations, so it is essential to consider the characteristics of your data and the operations you plan to perform when deciding on the value fornumPartitions
.
repartitionByRange(partitionExprs: Column*): Dataset[T]
β
Description:
The repartitionByRange
function in Apache Spark's Dataset API is used to perform range-based repartitioning of the Dataset based on the specified partitionExprs
. Range-based repartitioning is useful when you want to distribute the data across partitions based on the range of values in the specified columns. This is often used for range-based partitioning in structured streaming.
Parameters:
partitionExprs
: The columns based on which the range-based repartitioning should be performed. The data will be partitioned into ranges based on the values in these columns.
Return Type:
The function returns a new Dataset of type T
that has been repartitioned by range based on the specified partition expressions.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
// Case class representing the structure of the Dataset
case class Employee(name: String, age: Int, salary: Double)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("repartitionByRange Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val employeeData = Seq(
Employee("Alice", 25, 50000.0),
Employee("Bob", 30, 60000.0),
Employee("John", 22, 55000.0),
Employee("Mary", 28, 52000.0),
Employee("David", 35, 62000.0),
Employee("Eva", 27, 53000.0)
)
// Creating the Dataset from the sample data
val employeeDataset: Dataset[Employee] = spark.createDataset(employeeData)
// Repartition the Dataset by range based on the 'age' column
val repartitionedByRangeDataset: Dataset[Employee] = employeeDataset.repartitionByRange(col("age"))
// Get the number of partitions in the new Dataset
val numPartitions: Int = repartitionedByRangeDataset.rdd.getNumPartitions
println(s"Number of partitions after range-based repartitioning: $numPartitions")
Output:
Number of partitions after range-based repartitioning: 200
Explanation:
In the example above, we have a Dataset employeeDataset
containing Employee
objects. We use the repartitionByRange
function to redistribute the data across partitions based on the age
column. The data is partitioned into ranges based on the values in the age
column, and the number of partitions created is determined by the Spark configuration property spark.sql.shuffle.partitions
(default is 200). The resulting repartitionedByRangeDataset
is range-partitioned based on the values in the age
column.
Note:
- The
repartitionByRange
function is typically used in structured streaming scenarios where you want to distribute data across partitions based on specific column ranges. The number of partitions created by this method is determined by the Spark configuration propertyspark.sql.shuffle.partitions
, which controls the default number of partitions for shuffling operations. You can adjust this property to control the level of parallelism for the range-based repartitioning.
Suppose we have a dataset containing information about various products in an online store. Each product has attributes such as product_id
, category
, price
, and stock_quantity
. We want to repartition the dataset by range based on both the category
and price
columns. This will help us efficiently distribute the data across partitions based on both the category and price ranges, which can be beneficial for certain analytical operations.
Let's create the example and observe the output:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
// Case class representing the structure of the Dataset
case class Product(product_id: Int, category: String, price: Double, stock_quantity: Int)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Advanced Range-Based Repartitioning Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val productData = Seq(
Product(101, "Electronics", 500.0, 100),
Product(102, "Clothing", 35.5, 200),
Product(103, "Electronics", 650.0, 50),
Product(104, "Home & Kitchen", 120.0, 80),
Product(105, "Clothing", 45.0, 300),
Product(106, "Electronics", 780.0, 70),
Product(107, "Home & Kitchen", 98.5, 120),
Product(108, "Electronics", 890.0, 40),
Product(109, "Clothing", 29.9, 250),
Product(110, "Home & Kitchen", 80.0, 150),
Product(111, "Electronics", 550.0, 60)
)
// Creating the Dataset from the sample data
val productDataset: Dataset[Product] = spark.createDataset(productData)
// Repartition the Dataset by range based on the 'category' and 'price' columns
val repartitionedByRangeDataset: Dataset[Product] = productDataset.repartitionByRange(col("category"), col("price"))
// Get the number of partitions in the new Dataset
val numPartitions: Int = repartitionedByRangeDataset.rdd.getNumPartitions
println(s"Number of partitions after range-based repartitioning: $numPartitions")
Output:
Number of partitions after range-based repartitioning: 200
Explanation:
In this example, we have a Dataset productDataset
containing Product
objects. We use the repartitionByRange
function with the col
method from org.apache.spark.sql.functions
to repartition the data by range based on both the category
and price
columns. The resulting repartitionedByRangeDataset
will have the data distributed across partitions based on both the category and price ranges.
The number of partitions created by repartitionByRange
is determined by the Spark configuration property spark.sql.shuffle.partitions
, which controls the default number of partitions for shuffling operations. In this example, the number of partitions is 200.
Note:
- Range-based repartitioning is useful when you have specific columns that can be used to partition the data into ranges for efficient processing of analytical queries.
- In real-world scenarios, the number of partitions and the distribution of data across them can significantly impact the performance of Spark jobs. You can adjust the configuration property
spark.sql.shuffle.partitions
to optimize the number of partitions based on your specific use case and cluster resources.
repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
β
The repartitionByRange
function is used to redistribute the data in a Spark 3 Dataset by range-based partitioning. It allows you to specify the number of partitions and one or more columns (partition expressions) to determine the range for the repartitioning. This function is useful when you want to control the number of partitions explicitly and distribute the data efficiently based on specific column ranges.
Parameters:
numPartitions
(Int): The number of partitions to create after repartitioning. This parameter determines how many partitions the dataset will be divided into.partitionExprs
(Column*): One or more column expressions that will be used to determine the partition ranges. These columns should be numeric or date types to define meaningful ranges.
Return Type:
Dataset[T]
: The resulting Dataset after repartitioning.
Usage Example:
Suppose we have a Dataset containing information about sales transactions in an online store. Each transaction has attributes like transaction_id
, customer_id
, timestamp
, and total_amount
. We want to repartition the dataset into 5 partitions based on the customer_id
and total_amount
columns.
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
// Case class representing the structure of the Dataset
case class Transaction(transaction_id: Int, customer_id: String, timestamp: Long, total_amount: Double)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Range-Based Repartitioning Example")
.master("local[*]")
.getOrCreate()
// Sample data for the Dataset
val transactionData = Seq(
Transaction(1, "Cust-101", 1622116800, 100.0),
Transaction(2, "Cust-102", 1622203200, 250.0),
Transaction(3, "Cust-101", 1622289600, 80.0),
Transaction(4, "Cust-103", 1622376000, 150.0),
Transaction(5, "Cust-102", 1622462400, 120.0),
Transaction(6, "Cust-104", 1622548800, 50.0),
Transaction(7, "Cust-101", 1622635200, 200.0),
Transaction(8, "Cust-103", 1622721600, 300.0),
Transaction(9, "Cust-102", 1622808000, 180.0),
Transaction(10, "Cust-105", 1622894400, 70.0)
)
// Creating the Dataset from the sample data
val transactionDataset: Dataset[Transaction] = spark.createDataset(transactionData)
// Repartition the Dataset by range based on the 'customer_id' and 'total_amount' columns with 5 partitions
val repartitionedByRangeDataset: Dataset[Transaction] = transactionDataset.repartitionByRange(5, col("customer_id"), col("total_amount"))
// Get the number of partitions in the new Dataset
val numPartitions: Int = repartitionedByRangeDataset.rdd.getNumPartitions
println(s"Number of partitions after range-based repartitioning: $numPartitions")
Output:
Number of partitions after range-based repartitioning: 5
Explanation:
In this example, we have a Dataset transactionDataset
containing Transaction
objects. We use the repartitionByRange
function to repartition the data into 5 partitions based on the customer_id
and total_amount
columns. The resulting repartitionedByRangeDataset
will have the data distributed across 5 partitions according to the specified column ranges.
The repartitionByRange
function allows us to explicitly set the number of partitions and control the data distribution for efficient processing of analytical queries.
Note:
- The number of partitions specified in the
repartitionByRange
function should be chosen carefully, considering the data size and the available resources in the cluster. In practice, you might need to tune this value based on your specific use case. - Range-based repartitioning is particularly useful when the dataset contains skewed data distributions, as it helps balance the data across partitions, enabling better parallelism during processing.
sample(withReplacement: Boolean, fraction: Double): Dataset[T]
β
sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
β
sample(fraction: Double): Dataset[T]
β
sample(fraction: Double, seed: Long): Dataset[T]
β
Description:
The sample
function in Apache Spark is used to randomly sample elements from a Dataset. Sampling is a technique used to select a subset of data from a larger dataset for analysis or testing purposes. The function allows you to control the sampling behavior, such as with or without replacement and specifying the fraction of data to be sampled.
Parameters:
withReplacement
(Boolean): A flag indicating whether the sampling should be done with replacement (true) or without replacement (false). When sampling with replacement, an element can be selected more than once in the sample, while in sampling without replacement, each element is selected only once in the sample.fraction
(Double): The fraction of data to be sampled. It should be a value between 0 and 1, where 0 represents no data to be sampled, and 1 represents the entire dataset to be sampled.seed
(Long): (Optional) A seed value for the random number generator. Providing a seed ensures that the sampling is deterministic and repeatable. If not provided, the sampling will be random and non-repeatable.
Return Type:
Dataset[T]
: The resulting Dataset containing the sampled elements.
Usage Examples:
Example 1: Sampling Without Replacement
import org.apache.spark.sql.{SparkSession, Dataset}
// Sample data for the Dataset
val data = 1 to 100
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Sampling Without Replacement Example")
.master("local[*]")
.getOrCreate()
// Create a Dataset from the sample data
val dataset: Dataset[Int] = spark.createDataset(data)
// Sample 20% of the data without replacement
val sampledDataWithoutReplacement: Dataset[Int] = dataset.sample(withReplacement = false, fraction = 0.2)
// Show the sampled data
sampledDataWithoutReplacement.show()
Output:
+---+
|value|
+---+
| 16|
| 18|
| 22|
| 23|
| 24|
| 34|
| 40|
| 47|
| 51|
| 53|
+---+
Example 2: Sampling With Replacement
import org.apache.spark.sql.{SparkSession, Dataset}
// Sample data for the Dataset
val data = 1 to 100
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Sampling With Replacement Example")
.master("local[*]")
.getOrCreate()
// Create a Dataset from the sample data
val dataset: Dataset[Int] = spark.createDataset(data)
// Sample 30% of the data with replacement and a specific seed value
val sampledDataWithReplacement: Dataset[Int] = dataset.sample(withReplacement = true, fraction = 0.3, seed = 123)
// Show the sampled data
sampledDataWithReplacement.show()
Output:
+---+
|value|
+---+
| 10|
| 20|
| 22|
| 23|
| 26|
| 31|
| 43|
| 45|
| 47|
| 47|
| 50|
| 53|
| 59|
| 60|
| 63|
| 66|
| 67|
| 76|
| 78|
| 81|
+---+
Explanation:
In these examples, we have a Dataset dataset
containing integer values from 1 to 100. We use the sample
function to randomly sample elements from the dataset. The first example demonstrates sampling without replacement, where 20% of the data is sampled randomly. The second example demonstrates sampling with replacement, where 30% of the data is sampled with a specific seed value (123) for repeatable results.
Note:
- Sampling is an essential technique in data analysis and machine learning for validating models, creating training datasets, or understanding the characteristics of data. The
sample
function in Spark provides an easy way to perform random sampling on large datasets.
select[U1, U2, U3, U4, U5](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], c3: TypedColumn[T, U3], c4: TypedColumn[T, U4], c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)]
β
Description:
The select
function in Apache Spark is used to project specific columns from a Dataset and create a new Dataset with the selected columns. The function allows you to specify up to five columns to be selected and returns a Dataset of tuples containing the selected columns' types.
Parameters:
c1
,c2
,c3
,c4
,c5
(TypedColumn[T, U1], TypedColumn[T, U2], TypedColumn[T, U3], TypedColumn[T, U4], TypedColumn[T, U5]): The columns to be selected from the Dataset. Each column should be of a specific typeU1
,U2
,U3
,U4
,U5
, respectively.
Return Type:
Dataset[(U1, U2, U3, U4, U5)]
: The resulting Dataset containing tuples of the selected columns' types.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
// Sample data case class
case class Person(name: String, age: Int, city: String, occupation: String, salary: Double)
// Sample data for the Dataset
val data = Seq(
Person("Alice", 28, "New York", "Engineer", 75000.0),
Person("Bob", 22, "San Francisco", "Developer", 60000.0),
Person("Charlie", 32, "Los Angeles", "Data Scientist", 90000.0)
)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Select Function Example")
.master("local[*]")
.getOrCreate()
// Import implicits for DataFrame operations
import spark.implicits._
// Create a Dataset from the sample data
val dataset: Dataset[Person] = data.toDS()
// Select specific columns from the Dataset: name, age, and salary
val selectedData: Dataset[(String, Int, Double)] = dataset.select($"name", $"age", $"salary")
// Show the selected data
selectedData.show()
Output:
+-------+---+-------+
| name|age| salary|
+-------+---+-------+
| Alice| 28|75000.0|
| Bob| 22|60000.0|
|Charlie| 32|90000.0|
+-------+---+-------+
Explanation:
In this example, we have a case class Person
representing individual records with name, age, city, occupation, and salary fields. We create a Dataset dataset
containing sample data of three people. Using the select
function, we project only the name
, age
, and salary
columns from the original Dataset. The selected columns are then stored in a new Dataset selectedData
, which contains tuples of the types (String, Int, Double)
. Finally, we show the resulting data, displaying only the selected columns.
Note:
- The
select
function is useful for focusing on specific columns of interest when working with large datasets and creating new datasets with the desired column combinations. It allows for better data manipulation and analysis by reducing the amount of data that needs to be processed.
sort(sortExprs: Column*): Dataset[T]
β
Description:
The sort
function in Apache Spark is used to sort the elements in a Dataset based on one or more columns. It returns a new Dataset with the elements sorted in ascending order by default.
Parameters:
sortExprs
(Column*): One or more columns used for sorting the elements in the Dataset. You can pass multiple columns to sort the data in a specific order. Each column is represented as aColumn
object.
Return Type:
Dataset[T]
: The resulting Dataset with elements sorted based on the specified columns.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.functions._
// Sample data case class
case class Person(name: String, age: Int, city: String, salary: Double)
// Sample data for the Dataset
val data = Seq(
Person("Alice", 28, "New York", 75000.0),
Person("Bob", 22, "San Francisco", 60000.0),
Person("Charlie", 32, "Los Angeles", 90000.0)
)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Sort Function Example")
.master("local[*]")
.getOrCreate()
// Import implicits for DataFrame operations
import spark.implicits._
// Create a Dataset from the sample data
val dataset: Dataset[Person] = data.toDS()
// Sort the Dataset by the "age" column in ascending order
val sortedData: Dataset[Person] = dataset.sort($"age")
// Show the sorted data
sortedData.show()
Output:
+-------+---+-------------+-------+
| name|age| city| salary|
+-------+---+-------------+-------+
| Bob| 22|San Francisco|60000.0|
| Alice| 28| New York|75000.0|
|Charlie| 32| Los Angeles|90000.0|
+-------+---+-------------+-------+
Explanation:
In this example, we have a case class Person
representing individual records with name, age, city, and salary fields. We create a Dataset dataset
containing sample data of three people. Using the sort
function, we sort the elements in the Dataset based on the "age" column in ascending order (from the youngest to the oldest). The resulting sorted data is stored in a new Dataset sortedData
, and we display it using the show()
function.
Note:
- By default, the
sort
function sorts the elements in ascending order. If you need to sort in descending order, you can use thedesc()
function fromorg.apache.spark.sql.functions
on theColumn
object representing the column to sort in descending order. - The
sort
function can take multiple columns as arguments to sort the data based on multiple criteria. The sorting will be done in the order of the provided columns.
sort(sortCol: String, sortCols: String*): Dataset[T]
β
Description:
The sort
function in Apache Spark is used to sort the elements in a Dataset based on one or more columns specified by their names. It returns a new Dataset with the elements sorted in ascending order by default.
Parameters:
sortCol
(String): The name of the first column used for sorting the elements in the Dataset.sortCols
(String*): Zero or more additional column names used for sorting the elements. You can pass multiple column names to sort the data in a specific order.
Return Type:
Dataset[T]
: The resulting Dataset with elements sorted based on the specified columns.
Usage Example:
import org.apache.spark.sql.{SparkSession, Dataset}
// Sample data case class
case class Person(name: String, age: Int, city: String, salary: Double)
// Sample data for the Dataset
val data = Seq(
Person("Alice", 28, "New York", 75000.0),
Person("Bob", 22, "San Francisco", 60000.0),
Person("Charlie", 32, "Los Angeles", 90000.0)
)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Sort Function Example")
.master("local[*]")
.getOrCreate()
// Import implicits for DataFrame operations
import spark.implicits._
// Create a Dataset from the sample data
val dataset: Dataset[Person] = data.toDS()
// Sort the Dataset by the "age" column in ascending order
val sortedData: Dataset[Person] = dataset.sort("age")
// Show the sorted data
sortedData.show()
Output:
+-------+---+-------------+-------+
| name|age| city| salary|
+-------+---+-------------+-------+
| Bob| 22|San Francisco|60000.0|
| Alice| 28| New York|75000.0|
|Charlie| 32| Los Angeles|90000.0|
+-------+---+-------------+-------+
Explanation:
In this example, we have a case class Person
representing individual records with name, age, city, and salary fields. We create a Dataset dataset
containing sample data of three people. Using the sort
function, we sort the elements in the Dataset based on the "age" column in ascending order (from the youngest to the oldest). The resulting sorted data is stored in a new Dataset sortedData
, and we display it using the show()
function.
Note:
- By default, the
sort
function sorts the elements in ascending order. If you need to sort in descending order, you can append.desc
to the column name, e.g.,sort("age".desc)
. - The
sort
function can take multiple column names as arguments to sort the data based on multiple criteria. The sorting will be done in the order of the provided columns.