Skip to content

Using Python on Savio

Loading Python and accessing Python packages

To access Python, you need to load the Python module:

module load python

This will give you access to Python 3.7. Other versions of Python are available. Run module avail to see them.

Warning

Python 2 has reached end of life status; we recommend you transition to Python 3 if you are still using Python 2.

Warning

If you don't load any of the Python modules and you try to use Python, you'll get a very old version (2.7.5) without any of the standard packages, such as numpy.

Once you have loaded Python, you can determine what Python packages are provided by the system by entering the following:

conda list

However note that there are some machine learning (and other) packages with Python interfaces that still need to be loaded via a module. The machine learning packages can be seen with:

module avail ml

When writing a job script file for use with sbatch, we recommend that you load the Python module (and any other needed modules) just after the comments that contain the Slurm flags, e.g.,

#SBATCH --partition=savio2
#SBATCH --ntasks=2
#SBATCH --time=00:00:30

module load python

Installing additional Python packages

You can also install additional Python packages, such as those available via the Python Package Index (PyPI), that are not already provided on the system. You’ll need to install them into your home directory, your scratch directory, or your group directory (if any), rather than into system directories.

After loading Python, check whether some or all of the packages you need might already be available using conda list.

If not, you have two main options for installing packages.

First, you can use pip with the --user option to ensure they are installed in your home directory (by default in ~/.local):

pip install --user your_package_name

You can also use Conda to install packages, but to do so you need to install packages within a Conda environment. Here's how you can create an environment and install packages in it, in this case choosing to have the environment use Python 3.6 and to install numpy.

module load python
conda create --name=your_env_name python=3.6 numpy

To access an existing environment:

source activate your_env_name
conda install scipy # install another package into the environment

Use caution with conda init

We recommend caution if activating Conda environments using conda activate despite the fact that running source activate will give a deprecation warning. conda activate prompts the user to run conda init. conda init results in changes to your shell configuration (by modifying your .bashrc) that can affect the behavior of your shell more generally.

If you do run conda init, thereby modifying your .bashrc), we strongly recommend preventing Conda from activating the base environment when a new shell starts (e.g., when logging in) as this activation can slow down the login process and experience difficulties when the filesystem is responding slowly (e.g., potentially preventing you from logging in at all). To ensure this does not happen, after you run conda init, run the following:

conda config --set auto_activate_base False

To leave an environment:

source deactivate

Installing packages outside your home directory

You may want to install Python packages or Conda environments outside your home directory, particularly if you bumping up against the quota in your home directory.

Warning

Also note that Conda caches package source files and files shared across environments in ~/.conda/pkgs for use if you later install the same package again. (Note that the ~/.conda/envs directory is the default location where conda environments that you have created are stored.) This can contribute to running out of space in your home directory. In addition to the strategy of putting your .conda directory in scratch (discussed below), you may also want to remove uneeded files from the pkgs directory by running conda clean --all.

Pip

For packages installed via pip, which are installed by default in ~/.local, one strategy is to locate your .local directory in your scratch directly and symlink to it from your home directory:

cp -pr ~/.local /global/scratch/users/$USER/.local
rm -rf ~/.local
ln -s /global/scratch/users/$USER/.local ~/.local

Alternatively you can install packages individually in some other directory:

pip install --prefix=/path/to/directory package_name

and then modify PYTHONPATH to include /path/to/directory.

Conda

You can also use the symlink approach discussed above with ~/.conda, so that Conda environments are stored in scratch. Or you could do this just for ~/.conda/pkgs to have the directory containing cached package source files not take up space in your home directory.

And you can create Conda environments that are stored in some other directory:

SOME_DIR=/global/scratch/users/$USER/your_directory
module load python
conda create -p ${SOME_DIR}/test 
source activate ${SOME_DIR}/test
conda install biopython
source deactivate 

Running Python interactively

Using srun to run Python on the command line

To use Python interactively on Savio's compute nodes, you can use srun to submit an interactive job.

Once you're working on a compute node, you can then load the Python module and start Python.

We recommend use of IPython for interactive work with Python. To start IPython for command line use, simply enter ipython.

IPython provides tab completion, easy access to help information, useful aliases, and lots more. Type ? at the IPython command line for more details.

Using Open OnDemand to run Python in a Jupyter notebook

We now provide access to Jupyter notebooks via Open OnDemand. Notebooks can be run on compute nodes in any of the Savio partitions or on a standalone server for testing and exploration purposes.

Running Python code on a GPU

To run Python jobs on one or more GPUs, you'll need to request access to the GPU(s) by including the --gres=gpu:x flag to sbatch or srun, where x is the number of GPUs you need, following our example GPU job script.

Perhaps the most common use of Python with GPUs is via machine learning and other software that moves computation onto the GPU but hides the details of the GPU use from the user. Packages such as TensorFlow and PyTorch can operate in this way. Note that when you load the Tensorflow and PyTorch modules, they will load the version of Python they were built for, as well as CUDA-related modules. (In other cases you may need to load the cuda module yourself.)

To check if you are able to run Tensorflow or PyTorch calculations on the GPU, here is some basic testing code for Tensorflow:

# Check GPU availability:
import tensorflow as tf
tf.test.is_gpu_available()

# Find out which device your operations and tensors are assigned to:
tf.debugging.set_log_device_placement(True)
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0
c = tf.matmul(a, b)

# Manually place operations on a GPU:
with tf.device('/GPU:0'):
    a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
    b = tf.constant([[1.0
    c = tf.matmul(a, b)

and for PyTorch:

# Check GPU availability:
import torch
torch.cuda.is_available()

# Manually place operations on GPU:
print(torch.rand(2,3).cuda())

Monitoring GPU usage

To check on the current usage (and hence availability) of each of the GPUs on your GPU node, you can run nvidia-smi from the Linux shell within an interactive session on that GPU node. Near the end of that command's output, the "Processes: GPU Memory" table will list the GPUs currently in use, if any. For example, in a scenario where GPUs 0 and 1 are in use on your GPU node, you'll see something like the following. (By implication from the output below, GPUs 2 and 3 are currently not in use, and thus fully available, on this node.)

+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|===================================================================================|
| 0 32699 C python 2462MiB |
| 1 32710 C python 2108MiB |
====================================================================================|

To determine which line reflects your usage, you'd need to determine which of the process IDs (here 32699 and 32710) is the process associated with your job, e.g., using top or ps.

Parallel processing in Python on Savio

While there are many, many packages for parallelization in Python, we’ll focus on a few widely-used ones: ipyparallel, Dask, and Ray. For more complicated kinds of parallelism, you may want to use MPI within Python via the mpi4py package, discussed below.

Threaded linear algebra

The numpy and scipy packages included with Savio's Python installations are linked to Intel's MKL library, which provides fast, threaded versions of core linear algebra functions. You don't have to do anything -- any linear algebra that you run (either directly or that gets called by code that you run) will run in parallel on multiple threads, limited only by the number of cores available to your Slurm job on the node you are running on.

Note

Threading only works on individual nodes. Any linear algebra will only run in parallel on the cores available on one node.

Using ipyparallel

The ipyparallel package allows you to parallelize independent computations across multiple cores on one or more nodes. To use it on multiple nodes, you'll need to start up the worker processes yourself, as discussed below.

Single node parallelization

To use Python in parallel on a single node, we need to request resources on a node, start up ipyparallel worker processes, and then run our Python code.

Here is an example job script, including the commands to start up the ipyparallel worker processes, one for each core requested in the Slurm submission:

#!/bin/bash
# Job name:
#SBATCH --job-name=test
#
# Account:
#SBATCH --account=account_name
#
# Partition:
#SBATCH --partition=partition_name
#
# Request one node:
#SBATCH --nodes=1
#
# Request cores (24, for example)
#SBATCH --ntasks-per-node=24
#
# Wall clock limit:
#SBATCH --time=00:30:00
#
## Command(s) to run (example):
module load python
ipcluster start -n $SLURM_NTASKS &    # Start worker processes
sleep 120                             # Wait until all worker processes have successfully started
ipython job.py > job.pyout
ipcluster stop

The Python code example below demonstrates how to make use of ipyparallel functionality. This illustrates the use of a direct view (dispatching computational tasks in a simple way to the workers) and a load-balanced view (sequentially dispatching computational tasks as earlier computational tasks finish) to run code in parallel. It also shows how data structures can be broadcast to all the workers with push.

First, get an object 'handle' to the cluster of workers.

from ipyparallel import Client
c = Client()
c.ids

Here's a simple example of using a direct view to run a function on all the workers.

dview = c[:]
dview.block = True   # cause execution on main process to wait while tasks sent to workers finish
dview.apply(lambda : "Hello, World")

Let's set up an example calculation. Suppose we want to do leave-one-out cross-validation of a random forest statistical model. We define a function that fits to all but one observation and predicts for that observation.

def looFit(index, Ylocal, Xlocal):
    rf = rfr(n_estimators=100)
    fitted = rf.fit(np.delete(Xlocal, index, axis = 0), np.delete(Ylocal, index))
    pred = rf.predict(np.array([Xlocal[index, :]]))
    return(pred[0])

Now use a direct view to load packages on the workers and push the data to the workers.

dview.execute('from sklearn.ensemble import RandomForestRegressor as rfr')
dview.execute('import numpy as np')
# assume predictors are in 2-d array X and outcomes in 1-d array Y
# here we generate random arrays for X and Y
# we need to broadcast those data objects to the workers
import numpy as np
X = np.random.random((200,5))
Y = np.random.random(200)
mydict = dict(X = X, Y = Y, looFit = looFit)
dview.push(mydict)

Next we'll use a load-balanced view to run the code across all the observations in parallel using the map operation.

# Need a wrapper function because map() only operates on one argument
def wrapper(i):
    return(looFit(i, Y, X))

n = len(Y)
import time
time.time()
# Run a parallel map, executing the wrapper function on indices 0,...,n-1
lview = c.load_balanced_view()
lview.block = True # Cause execution on main process to wait while tasks sent to workers finish
pred = lview.map(wrapper, range(n))   # Run calculation in parallel
time.time()
print(pred[0:10])

Multi-node parallelization

To run ipyparallel workers across multiple nodes, you need to modify the #SBATCH options in your submission script and the commands used to start up the worker processes at the end of that script, but your Python code can stay the same.

Here’s an example submission script, with the syntax for starting the workers:

#!/bin/bash
# Job name:
#SBATCH --job-name=test
#
# Account:
#SBATCH --account=account_name
#
# Partition:
#SBATCH --partition=partition_name
#
# Total number of tasks
#SBATCH --ntasks=48
#
# Wall clock limit:
#SBATCH --time=00:30:00
#
## Command(s) to run (example):
module load python
# Start the controller process:
ipcontroller --ip='*' &
sleep 50                     # Wait for controller to start
srun ipengine &              # Start as many worker processes as we have Slurm tasks
sleep 150                    # Wait until all worker processes have successfully started
ipython job.py > job.pyout
ipcluster stop

Parallelization in a Jupyter notebook

You can use ipyparallel functionality within a Jupyter notebook by setting up IPython Clusters.

Using Dask

The Dask package provides a variety of tools for managing parallel computations. Some of the key ideas/features are that users can:

  • separate what to parallelize from how and where the parallelization is actually carried out,
  • run the same code on different computational resources (without touching the actual code that does the computation), and
  • use Dask's distributed data structures that can be treated as a single data structure when running operations on them (like Spark).

First you'll need to request one or more nodes via Slurm using sbatch or srun.

Then, to run Dask on multiple cores on a single node, you can use any of the 'threads', 'processes', or 'distributed' schedulers, simply setting the number of workers (four in the examples below). Generally you'd want to use only as many workers as you have cores available.

import dask
# Threads scheduler
dask.config.set(scheduler='threads', num_workers = 4)

# Processes scheduler
import dask.multiprocessing
dask.config.set(scheduler='processes', num_workers = 4)  

# Distributed scheduler 
# Fine to use this on a single node and it provides some nice functionality
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers = 4)
c = Client(cluster)

To run Dask across cores on multiple nodes, you can use dask-ssh or a combination of dask-scheduler and dask-worker to start up the necessary Python processes. Here's an example that uses the Slurm srun command with dask-worker to make the connections to the nodes in the Slurm allocation:

export SCHED=$(hostname):8786
# Start scheduler process
dask-scheduler &
sleep 50    # might need even more time to ensure scheduler starts up fully
# Start worker processes
srun dask-worker tcp://${SCHED} --local-directory /tmp &
sleep 100   # might need even more time to make sure workers start up fully

The srun command here (which is of course being run within another sbatch or srun invocation that you ran to get access to the compute nodes) will run dask-worker on as many workers as the number of Slurm 'tasks', across the allocated nodes.

The use of --local-directory is because we've seen problems when the local directory for Dask is a Savio user home directory.

Then in Python, connect to the cluster via the scheduler.

import os, time, dask
from dask.distributed import Client
c = Client(address = os.getenv('SCHED'))

Using Ray

Ray provides a variety of tools for managing parallel computations. At its simplest, it allows you to parallelize independent computations across multiple cores on one or more machines. One nice feature relative to Dask is that Ray allows one to share data across all worker processes on a node, without multiple copies of the data, using the object store. At its more complicated, Ray provides tools to build distributed (across multiple cores or nodes) applications where different processes interact with each other (using the notion of 'actors').

First you'll need to request one or more nodes via Slurm using sbatch or srun.

Then, to run Ray on multiple cores on a single node, you can initialize Ray from within Python.

import ray
ray.init()
## alternatively, to specify a specific number of cores:
ray.init(num_cpus = 4)

To run Ray across cores on multiple nodes, see these Ray instructions for Slurm.

Using MPI with Python

You can use the mpi4py package to provide MPI functionality within your Python code. This allows individual Python processes to communicate amongst each other to carry out a computation. We won’t go into the details of writing mpi4py code, but will simply show how to set things up so an mpi4py-based job will run.

Here’s an example job script, illustrating how to load the needed modules and start the Python job:

#!/bin/bash
# Job name:
#SBATCH --job-name=test
#
# Account:
#SBATCH --account=account_name
#
# Partition:
#SBATCH --partition=partition_name
#
# Total number of tasks
#SBATCH --ntasks=48
#
# Wall clock limit:
#SBATCH --time=00:30:00
#
## Command(s) to run (example):
module load gcc openmpi python
mpirun python mpiCode.py > mpiCode.pyout

Here’s a brief ‘hello, world’ example of using mpi4py. Note that this particular example requires that you load the numpy module in your job script, as shown above. Also note that this example does not demonstrate the key motivation for using mpi4py, which is to enable more complicated inter-process communication:

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD

# simple print out Rank & Size
id = comm.Get_rank()
print("Of ", comm.Get_size() , " workers, I am number " , id, ".")

# function to be run many times
def f(id, n):
    np.random.seed(id)
    return np.mean(np.random.normal(0, 1, n))

n = 1000000
# execute on each worker process
result = f(id, n)

# gather results to single ‘main’ process and print output
output = comm.gather(result, root = 0)
if id == 0:
    print(output)