Notes on reducing I/O bottlenecks in parallel computations with Dask and Zarr.

When I’m analysing data I tend to keep one eye on the system monitor at the top of my screen. The height of the green bar tells me how much RAM I’m using and the height of the blue bar tells me how much CPU…

status bar

I’ll kick off a computation then look up to see what’s happening. Most of the time the blue bar chugs away at 1/8 height, which means that 1 of the 8 logical cores on my computer is fully utilised and the others are idle. I spend a fair amount of time waiting for computations to finish and it started to bug me that 7/8 CPU capacity was going spare. Many of the computations I run are simple and could be parallelized, so I started looking into ways of adapting my analysis code to make better use of multiple CPUs.

Dask + HDF5

One solution I really like is Dask. Using dask.array it’s simple to parallelize a wide range of operations over numerical arrays, using either multiple threads or multiple processes. To evaluate Dask I wrote an alternative Dask-backed implementation of some of the basic genetic data transformations I use every day. I then ran these on some data from the Ag1000G project using Dask’s multi-threaded scheduler, hoping to see my idle CPU fire up to maximum capacity…

import h5py
import multiprocessing
import dask; print('dask', dask.__version__)
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics.profile_visualize import visualize
from cachey import nbytes
import allel; print('scikit-allel', allel.__version__)
import bokeh
from bokeh.io import output_notebook
output_notebook()
from functools import reduce
import operator
dask 0.11.1
scikit-allel 1.0.3
# Setup input data, using data on local SSD
callset = h5py.File('data/2016-05-16/ag1000g.phase1.ar3.pass.3R.h5', mode='r')
genotype = callset['3R/calldata/genotype']
genotype
<HDF5 dataset "genotype": shape (13167162, 765, 2), type "|i1">
# check how the data were compressed
genotype.compression, genotype.compression_opts
('gzip', 3)
# how many cores on this computer?
multiprocessing.cpu_count()
8
# when I made the HDF5 file I set the chunks too small, so let's operate on bigger chunks
chunks = (genotype.chunks[0], genotype.chunks[1] * 20, genotype.chunks[2])
chunks
(6553, 200, 2)
import humanize
import numpy as np
print('chunk size:', humanize.naturalsize(np.product(chunks)))
chunk size: 2.6 MB
!cat /usr/local/bin/drop_caches
#!/bin/bash
# This must be run as sudo, so to avoid passwords this 
# script should be set in the sudoers file with NOPASSWD.
echo 1 > /proc/sys/vm/drop_caches
# ensure OS page cache is cleared 
!sudo drop_caches
# run the allele count computation via Dask
gd = allel.GenotypeDaskArray(genotype, chunks=chunks)
ac = gd.count_alleles(max_allele=3)
with ResourceProfiler(dt=1) as rprof:
    ac.compute()
visualize(rprof);

As you can see from the plot above, this computation uses just over 1 core (~130% CPU). The limiting factor is related to h5py which I’ve used to pull input data out of an HDF5 file. The h5py library is a totally awesome piece of software that I use every day, but HDF5 is not designed to support multi-threaded data access. Also, h5py doesn’t release the GIL, a Python technicality which means other threads cannot run while h5py is doing anything, even if the other threads want to do something unrelated to HDF5 I/O.

Dask + Zarr

Recently I’ve been working on Zarr, a new Python library for chunked, compressed, N-dimensional data. Previously I introduced Zarr and showed how it can be used to get fast access into large multi-dimensional arrays. The other thing Zarr can do is let you read or write to an array from multiple threads or processes in parallel. Also, Zarr releases the GIL during compression and decompression, so other threads can carry on working. Here’s the allele count example again, but this time using a Zarr array as the input data source:

import zarr
zarr.__version__
'2.1.3'
# Setup a Zarr array, copying in genotype data from the HDF5 file.
# N.B., let's use the similar compression options as the HDF5 file for a fairer
# comparison, although other compressors might be faster.
# Let's also use SSD, same as where HDF5 was stored above.
genotype_zarr = zarr.open_like(genotype, path='data/2016-05-16/genotype.zarr', mode='w', 
                               chunks=chunks, compression='blosc',
                               compression_opts=dict(cname='zlib', clevel=3, shuffle=0))
genotype_zarr[:] = genotype
genotype_zarr
Array((13167162, 765, 2), int8, chunks=(6553, 200, 2), order=C)
  nbytes: 18.8G; nbytes_stored: 640.7M; ratio: 30.0; initialized: 8040/8040
  compressor: Blosc(cname='zlib', clevel=3, shuffle=0)
  store: DirectoryStore
# ensure OS pagecache is cleared 
!sudo drop_caches
# run allele count computation via dask
gdz = allel.model.dask.GenotypeDaskArray(genotype_zarr)
acz = gdz.count_alleles(max_allele=3)
with ResourceProfiler(dt=1) as rprof:
    acz.compute()
visualize(rprof);

This time I get over 700% CPU usage. Also the computation is about 8 times faster, which is about what you’d expect given the higher CPU usage.

Zlib is a fairly slow compressor, what happens if we use something faster like LZ4?

genotype_zarr_lz4 = zarr.open_like(genotype, path='data/2016-05-16/genotype.zarr.lz4', mode='w', 
                                   chunks=chunks, compression='blosc',
                                   compression_opts=dict(cname='lz4', clevel=5, shuffle=0))
genotype_zarr_lz4[:] = genotype_zarr
genotype_zarr_lz4
Array((13167162, 765, 2), int8, chunks=(6553, 200, 2), order=C)
  nbytes: 18.8G; nbytes_stored: 1.0G; ratio: 18.2; initialized: 8040/8040
  compressor: Blosc(cname='lz4', clevel=5, shuffle=0)
  store: DirectoryStore
# ensure OS pagecache is cleared 
!sudo drop_caches
# run allele count computation via dask
gdz = allel.model.dask.GenotypeDaskArray(genotype_zarr_lz4)
acz = gdz.count_alleles(max_allele=3)
with ResourceProfiler(dt=1) as rprof:
    acz.compute()
visualize(rprof);

This goes even faster, and I’m still getting nearly full CPU utilisation, so probably I could push my SSD harder.

Distributed Dask + Zarr

I’m currently focused on making better use of multi-core processors, but others like Matt Rocklin are working on frameworks for large-scale distributed computing. After I released Zarr v0.4.0 in April, Matt got in touch to suggest a reorganization of the code so that Zarr arrays could be stored in distributed systems like S3 or HDFS. Earlier this week I released Zarr v1.0.0 which includes a new storage architecture to support this. Here is Matt on using the new version of Zarr with Dask and S3 on a 20 node (160 core) cluster…

Dask’s distributed scheduler looks seriously cool. It’s exciting to think that computations I’m currently coding to run in parallel via Dask on my multi-core desktop could in future be scaled up to a large compute cluster without any extra work at all.

Further reading

To go with the new Zarr release there is some new documentation, including a tutorial, API reference and storage specification. Please bear in mind that Zarr is still a young project, so if you do take it for a spin, any feedback is appreciated.

See also:

import datetime
print(datetime.datetime.now().isoformat())
2016-11-01T19:38:06.960872