CPU blues
Notes on reducing I/O bottlenecks in parallel computations with Dask and Zarr.
When I’m analysing data I tend to keep one eye on the system monitor at the top of my screen. The height of the green bar tells me how much RAM I’m using and the height of the blue bar tells me how much CPU…
I’ll kick off a computation then look up to see what’s happening. Most of the time the blue bar chugs away at 1/8 height, which means that 1 of the 8 logical cores on my computer is fully utilised and the others are idle. I spend a fair amount of time waiting for computations to finish and it started to bug me that 7/8 CPU capacity was going spare. Many of the computations I run are simple and could be parallelized, so I started looking into ways of adapting my analysis code to make better use of multiple CPUs.
Dask + HDF5
One solution I really like is Dask. Using dask.array
it’s simple to parallelize a wide range of operations over numerical arrays, using either multiple threads or multiple processes. To evaluate Dask I wrote an alternative Dask-backed implementation of some of the basic genetic data transformations I use every day. I then ran these on some data from the Ag1000G project using Dask’s multi-threaded scheduler, hoping to see my idle CPU fire up to maximum capacity…
dask 0.11.1
scikit-allel 1.0.3
<HDF5 dataset "genotype": shape (13167162, 765, 2), type "|i1">
('gzip', 3)
8
(6553, 200, 2)
chunk size: 2.6 MB
#!/bin/bash
# This must be run as sudo, so to avoid passwords this
# script should be set in the sudoers file with NOPASSWD.
echo 1 > /proc/sys/vm/drop_caches
As you can see from the plot above, this computation uses just over 1 core (~130% CPU). The limiting factor is related to h5py which I’ve used to pull input data out of an HDF5 file. The h5py library is a totally awesome piece of software that I use every day, but HDF5 is not designed to support multi-threaded data access. Also, h5py doesn’t release the GIL, a Python technicality which means other threads cannot run while h5py is doing anything, even if the other threads want to do something unrelated to HDF5 I/O.
Dask + Zarr
Recently I’ve been working on Zarr, a new Python library for chunked, compressed, N-dimensional data. Previously I introduced Zarr and showed how it can be used to get fast access into large multi-dimensional arrays. The other thing Zarr can do is let you read or write to an array from multiple threads or processes in parallel. Also, Zarr releases the GIL during compression and decompression, so other threads can carry on working. Here’s the allele count example again, but this time using a Zarr array as the input data source:
'2.1.3'
Array((13167162, 765, 2), int8, chunks=(6553, 200, 2), order=C)
nbytes: 18.8G; nbytes_stored: 640.7M; ratio: 30.0; initialized: 8040/8040
compressor: Blosc(cname='zlib', clevel=3, shuffle=0)
store: DirectoryStore
This time I get over 700% CPU usage. Also the computation is about 8 times faster, which is about what you’d expect given the higher CPU usage.
Zlib is a fairly slow compressor, what happens if we use something faster like LZ4?
Array((13167162, 765, 2), int8, chunks=(6553, 200, 2), order=C)
nbytes: 18.8G; nbytes_stored: 1.0G; ratio: 18.2; initialized: 8040/8040
compressor: Blosc(cname='lz4', clevel=5, shuffle=0)
store: DirectoryStore
This goes even faster, and I’m still getting nearly full CPU utilisation, so probably I could push my SSD harder.
Distributed Dask + Zarr
I’m currently focused on making better use of multi-core processors, but others like Matt Rocklin are working on frameworks for large-scale distributed computing. After I released Zarr v0.4.0 in April, Matt got in touch to suggest a reorganization of the code so that Zarr arrays could be stored in distributed systems like S3 or HDFS. Earlier this week I released Zarr v1.0.0 which includes a new storage architecture to support this. Here is Matt on using the new version of Zarr with Dask and S3 on a 20 node (160 core) cluster…
Dask’s distributed scheduler looks seriously cool. It’s exciting to think that computations I’m currently coding to run in parallel via Dask on my multi-core desktop could in future be scaled up to a large compute cluster without any extra work at all.
Further reading
To go with the new Zarr release there is some new documentation, including a tutorial, API reference and storage specification. Please bear in mind that Zarr is still a young project, so if you do take it for a spin, any feedback is appreciated.
See also:
2016-11-01T19:38:06.960872