Dask Delayed — How to Parallelize Your Python Code With Ease

Dask Delayed — How to Parallelize Your Python Code With Ease

Parallelize any function with a single decorator

We all know Python isn’t the fastest programming language. Its <em>Global Interpreter Lock</em> __ (GIL) mechanism allows only one thread to execute Python bytecode at once. You can avoid this limitation by changing the interpreter or implementing process-based parallelism techniques.

I’ve talked about parallelism in Python in the past, so make sure to check these articles if you’re not familiar with the topic:

These methods work like a charm, but there’s a simpler alternative — parallel processing with the Dask library.

If you’re not familiar with Dask, it’s basically a Pandas equivalent for large datasets. It’s an oversimplification, so please read more about the library here.

This article is structured as follows:

  • Problem Description
  • Test: Running Tasks Sequentially
  • Test: Running Tasks in Parallel with Dask
  • Conclusion

You can download the source code for this article here.

Problem Description

The goal is to connect to jsonplaceholder.typicode.com— a free fake REST API.

You’ll connect to several endpoints and obtain data in the JSON format. There’ll be six endpoints in total. Not a whole lot, and Python will most likely complete the task in seconds. Not too great for demonstrating parallelism capabilities, so we’ll spice things up a bit.

In addition to fetching API data, the program will also sleep for a second between making requests. As there are six endpoints, the program should do nothing for six seconds — but only when the calls are executed sequentially.

The following code snippet imports the required libraries, declares a list of URLs, and a function for obtaining data from a single URL:

import time
import requests
from dask import delayed, compute

URLS = [
    'https://jsonplaceholder.typicode.com/posts',
    'https://jsonplaceholder.typicode.com/comments',
    'https://jsonplaceholder.typicode.com/albums',
    'https://jsonplaceholder.typicode.com/photos',
    'https://jsonplaceholder.typicode.com/todos',
    'https://jsonplaceholder.typicode.com/users'
]

def fetch_single(url: str) -> None:
    print(f'Fetching: {url}...')
    req = requests.get(url)
    time.sleep(1)
    print(f'Fetched {url}!')
    return req.content

Let’s test the execution time without parallelism first.

Test: Running Tasks Sequentially

The following code snippet fetches the data sequentially inside a Jupyter notebook. If you’re not in a notebook environment, please remove the %%time magic command:

%%time

fetch_normal = []

for url in URLS:
    single = fetch_single(url)
    fetch_normal.append(single)

After executing this cell, you’ll see a similar output:

Image 1 — Sequential execution (image by author)

Image 1 — Sequential execution (image by author)

Nothing surprising here — Python fetches data from the API endpoints in the declared order, and it took around 8 seconds to finish, primarily due to the sleep() calls.

As it turns out, these API calls are independent and can be called in parallel. Let’s see how to do that next.

Test: Running Tasks in Parallel with Dask

We’ll need to alter the code slightly. The first thing to do is wrap our fetch_single function with a delayed decorator. Once outside the loop, we also have to call the compute function from Dask on every item in the fetch_dask array, since calling delayeddoesn’t do the computation.

Here’s the entire code:

%%time

fetch_dask = []

for url in URLS:
    single = delayed(fetch_single)(url)
    fetch_dask.append(single)
    
results_dask = compute(*fetch_dask)

The alternative to wrapping the function with a delayed decorator is using the @delayed notation above the function declaration. Feel free to use either.

Anyway, the execution results are shown in the image below:

Image 2 — Parallel execution with Dask Delayed (image by author)

Image 2 — Parallel execution with Dask Delayed (image by author)

As you can see, the print ordering is different. That’s because Dask was instructed to start all of the tasks separately. The total execution time was just under 1.5 seconds, with 1 second being used for sleep.

Nice improvement, overall.

The question remains —are the returned results identical? Well, yes and no. The values obtained in the sequential example are in a list, whereas the ones obtained after calling compute are in a tuple.

The following image verifies that:

Image 3 — Data type comparison (image by author)

Image 3 — Data type comparison (image by author)

As a result, we can’t compare the data structures directly, but we can make the comparison after converting the second one to a list:

Image 4 — Content comparison (image by author)

Image 4 — Content comparison (image by author)

The final answer is yes — you’ll get identical results with both approaches, but the parallelized one takes a fraction of the time.


Conclusion

Implementing parallelism to your applications or data science pipelines requires a lot of thought. Luckily, the implementation in code is trivial, as only two functions are needed.

The good news is — you can use Dask to parallelize almost anything. From basic dataset loadings, statistical summaries to model training — Dask can handle it.

Let me know if you want a more advanced data science-based tutorial on Dask.

Stay connected