Spark is Hadoop MapReduce done in memory
Software Category: devel
For detailed information, visit the Spark website.
To find the available versions and learn how to load them, run:
module spider spark
The output of the command shows the available Spark module versions.
For detailed information about a particular Spark
module, including how to load the module, run the
module spider command with the module’s full version label. For example:
module spider spark/3.1.2
|Module||Version||Module Load Command|
|spark||3.1.2||module load spark/3.1.2|
Using Spark interactively
Please request an ijob or a Desktop session.
Web UI [Desktop]
Spark provides a user interface (UI) for you to monitor your Spark job. If you intend to use the Web UI, you must request a Desktop session through Open OnDemand.
The URL is displayed upon launching Spark and is of the form
udc-xxxx-xx is the hostname of the compute node. You can either right click on the link and select “Open Link,” or enter
localhost:4040 in the browser.
Shell prompt [ijob/Desktop]
To start up a Scala or PySpark shell prompt, run
pyspark. For example:
$ spark-shell ... Spark context Web UI available at http://udc-xxxx-xx:4040 Spark context available as 'sc' (master = local[*], app id = local-1633023285536). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.1.2 /_/ Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.2) Type in expressions to have them evaluated. Type :help for more information. scala>
To start an R prompt, you must load R first. Then run
sparkR. If the R version is different from 4.1.0, you will see a warning message:
package ‘SparkR’ was built under R version 4.1.0
We recommend loading the closest available version.
Jupyter notebook/lab [Desktop]
Instead of the default Python shell, you can redirect
pyspark to open a Jupyter notebook/lab as follows. First, you need access to the
module load anaconda
Next, set two environment variables:
export PYSPARK_DRIVER_PYTHON=jupyter export PYSPARK_DRIVER_PYTHON_OPTS=lab
If you’d prefer a notebook session, replace
Navigate to your working directory and run:
This will start up Jupyter inside a browser automatically. Use the “Python 3” kernel.
The example below estimates the value of pi in a PySpark session running on 16 cores, with the JupyterLab window on the left and the Spark Web UI event timeline on the right. Note that the
sc is initialized automatically.
SLURM Script Templates for Batch Jobs
Local mode on a single node
#!/bin/bash #SBATCH -p standard # partition #SBATCH -A myaccount # your allocation #SBATCH -N 1 # number of nodes #SBATCH -c 10 # number of cores per node #SBATCH -t 10:00:00 # time module purge module load spark spark-submit script.py
You must initialize
SparkContext explicitly in your script, e.g.:
from pyspark import SparkContext sc = SparkContext("local[*]")
The Spark log is written to
slurm-<JOB_ID>.out. After the job is finished, use the
seff <JOB_ID> command to verify that the cores are used effectively:
$ seff 1232109 ... Cores per node: 10 CPU Utilized: 01:17:16 CPU Efficiency: 82.20% of 01:34:00 core-walltime ...
If the CPU efficiency is much lower, please consider using fewer cores for your future jobs.
Standalone cluster mode using multiple nodes
We gratefully acknowledge the Pittsburg Supercomputing Center for giving us permission to use their Spark configuration and launch scripts.
Before using multiple nodes, please make sure that your job can use a full standard node effectively. When you request N nodes in the standalone cluster mode, one node is set aside as the master node and the remaining N-1 nodes are worker nodes. Thus, running on 2 nodes will have the same effect as running on 1 node.
#!/bin/bash #SBATCH -p parallel # do not modify #SBATCH --exclusive # do not modify #SBATCH -A myaccount # your allocation #SBATCH -N 3 # number of nodes #SBATCH -c 40 # number of cores per node #SBATCH -t 3:00:00 # time #--------------------------- # do not modify this section export PARTITIONS=$(( (SLURM_NNODES-1) * SLURM_CPUS_PER_TASK )) export MASTERSTRING="spark://$(hostname):7077" $SPARK_HOME/scripts/spark-cluster-init.sh & sleep 10 #--------------------------- module purge module load spark spark-submit --master $MASTERSTRING script.py
In the above SLURM script template, note that:
parallelnodes with exclusive access.
You may reduce the number of cores if the job needs more memory per core.
Your code should begin with:
from pyspark import SparkConf from pyspark import SparkContext conf = SparkConf() sc = SparkContext(conf=conf)
The number of partitions should be equal to the total number of cores on worker nodes. This has to be set explicitly in the second argument of
PARTITIONSenvironment variable is defined in the SLURM script for your convenience. Without doing so only one partition will be created on each node.
We used a code that estimates the value of pi as a benchmark. The following table illustrates good scaling performance across multiple nodes (40 cores per node) on Rivanna.
|Nodes||Worker nodes||Time (s)|
Temporary files are created inside your scratch directory during a multinode Spark job. They have the form:
You may safely remove these files when your job is done by running:
Make sure that you do not use this pattern to name other files!