An Introduction to Parallelization – Multithreading and Multiprocessing in Python

One of my hobbies include developing data analytics workflows. More specifically, I like to acquire and process market data from MMORPGs in order to generate useful and concise reports to guide speculation in in-game markets.

Historically, my creations have always operated serially. While the function and speed of my scripts has never really posed any functional issues, I’ve lately taken a greater interest in improving my scripts, including making them run faster.

I primarily work with Python and make heavy use of Pandas and SQLite libraries. In my quest for faster execution, I’ve managed making some improvements through refactoring and appropriate use of indices. While Python has a reputation for being a slow language, since the bulk of my computationally-intensive code uses libraries which leverage other, faster languages, Python itself hasn’t proven to be much of a bottleneck itself (at least for now). This became especially evident when I realized that executing my scripts after compiling them with Cython revealed no performance improvement. While there are likely further improvements I can make with serial execution, I came to the conclusion that what I really needed was parallelization.

Let’s start by taking a look at some code representing a simplified model of my batch processing workflow which has not been parallelized at all:

#python version 3.11.3

import time

starttime = time.time()

EndpointList = ['24h', '1h', '5m'] 
IntervalList = ['30', '60', '90']

def DataCollect(Endpoint, IntervalList):
	time.sleep(0.05 * float(IntervalList[-1]))
	EndpointIntervalList = []
	for Interval in IntervalList:
		EndpointIntervalList = EndpointIntervalList + [[Endpoint, Interval]]
	return(EndpointIntervalList)

def DataImport(EndpointIntervalList):
	EndpointIntervalListWithData = []
	time.sleep(0.01 * float(IntervalList[-1]))
	for EndpointInterval in EndpointIntervalList:
		EndpointInterval = EndpointInterval + [f'Pandas Dataframe For {EndpointInterval[0]}']
		EndpointIntervalListWithData = EndpointIntervalListWithData + [EndpointInterval]
	return(EndpointIntervalListWithData)

def ReportGen(EndpointIntervalReport):
	time.sleep(0.1 * float(EndpointIntervalReport[1]))
	print(f'report {EndpointIntervalReport} completed')
	

### runtime ###
if __name__ == '__main__':
	for Endpoint in EndpointList:
		EndpointIntervalList = DataCollect(Endpoint, IntervalList)
		EndpointIntervalListWithData = DataImport(EndpointIntervalList)
		for EndpointIntervalReport in EndpointIntervalListWithData:
			ReportGen(EndpointIntervalReport)
		
	print('script completed')
	print(time.time() - starttime)

The three functions defined in the above script represent discrete actions:

  • DataCollect downloads data from an API Endpoint to the local filesystem and returns a list of values that we need to generate reports for
  • DataImport merges the downloaded data into a table, which is then appended to each value in the aforementioned list and returned
  • ReportGen consumes each set of values in the list, generating a report

The sleep commands within each function serve as a representation of their execution time:

  • For DataCollect, the sleep command represents an external limiting factor, or more specifically the product of an API Endpoint’s call limit and the largest interval defined in IntervalList. In the above example, it takes 4.5 seconds for DataCollect to perform one iteration
  • For DataImport, the sleep command represents the product of our machine’s i/o capabilities and the largest interval defined in IntervalList. In the above example, it takes 0.9 seconds for DataImport to perform one iteration
  • For DataCollect, the sleep command is the product of our machine’s single-threaded CPU speed and the Interval for the report currently being processed. In the above example, this can either be 3, 6, or 9 seconds depending on the interval (30, 60, 90) specified.

With this information, and given that this version of the script runs serially, we should be able to guess the execution time pretty accurately. Since there are three endpoints defined, we know that DataCollect and DataImport will be invoked three times each, while DataCollect will be invoked 9 times, bringing us to a total execution time of 70.2 seconds.

Executing the script returns:

report ['24h', '30', 'Pandas Dataframe For 24h'] completed
report ['24h', '60', 'Pandas Dataframe For 24h'] completed
report ['24h', '90', 'Pandas Dataframe For 24h'] completed
report ['1h', '30', 'Pandas Dataframe For 1h'] completed
report ['1h', '60', 'Pandas Dataframe For 1h'] completed
report ['1h', '90', 'Pandas Dataframe For 1h'] completed
report ['5m', '30', 'Pandas Dataframe For 5m'] completed
report ['5m', '60', 'Pandas Dataframe For 5m'] completed
report ['5m', '90', 'Pandas Dataframe For 5m'] completed
script completed
70.20245575904846

As we will find, we can bring this execution time down much further by incorporating multithreading and multiprocessing techniques.

When Should We Use Parallelization?

Before we can implement parallelization techniques, we need to take a look at each discrete function in our code and ask ourselves two critical questions:

  • Can this function run in-parallel with itself without adverse effect?
  • Can this function run in-parallel with other functions without adverse effect?

Since we will not be making any changes to the three core functions described earlier, let’s assess each for their parallelization potential:

  • DataCollect is invoked 3 times, with the operation speed modeled after an external factor: an imaginary API call limit.
    • In a real-world script, parallelizing this function with itself would cause us to exceed the API call limit, resulting in 403 errors and causing the entire script to fail. As a result of this adverse effect, No more than one invocation of this function may run at a time.
    • On the other hand, since this function has negligible impact on local machine resources while it is running, this function should be invoked concurrently with other functions.
  • DataImport is invoked 3 times, with the operation speed modeled after filesystem i/o.
    • In a real-world script, parallelizing this function with itself would not lead to an improvement in execution because the function is already bound by filesystem i/o. Furthermore, since our subsequent function ReportGen is dependent on each iteration of DataImport successfully completing, in a worst-case scenario, parallelizing DataImport with itself could result in slower overall execution time: No more than one invocation of this function may run at a time.
    • Conversely, since DataCollect and ReportGen are not adversely affected by another process taxing our filesystem i/o, this function should be invoked concurrently with other functions.
  • ReportGen is invoked 9 times, with the operation speed modeled after single-threaded CPU performance.
    • In a real-world script, assuming there is sufficient RAM available, parallelizing this function with itself will speed-up execution roughly in-proportion with the number of CPU cores that we have access to, with diminishing returns given that we have a mixture of reports requiring different execution duration. Furthermore, even in the absence of additional CPU cores, since there are no downstream functions dependent on each iteration of ReportGen completing and returning a result, we should run as many invocations of this function at a time as we have CPU and RAM resources available.
    • Likewise, this function should be invoked concurrently with other functions because DataImport and DataCollect are not adversely affected by other processes taxing our CPU.

It is important to note here that parallelization does not always need to involve spreading a computational load across multiple CPUs. For instance, if an application involves accessing multiple, discrete filesystems, we can run filesystem operations concurrently to leverage their combined i/o. Likewise, if an application pulls data from many different APIs, we can run API calls concurrently to leverage all available external resources. Parallelization involves making the best use of all of the resources we have at our disposal to execute tasks more efficiently.

So, in-summary, all three of our functions can run in-parallel with each other, and ReportGen can run in-parallel with itself. Our goal will be to capitalize on all of these qualities to reduce our execution time as much as possible in incremental steps.

Running separate functions concurrently via Multithreading

Let’s start by adding a thread for our ReportGen function. Since we won’t be changing our core functions, we’ll jump straight into what we need to spawn a new thread. Changes are highlighted in bold.

...
from queue import Queue
import threading
from threading import Thread

def ReportGenInvoke():
	while True:
		ReportGen(ReportGenQueue.get())
		ReportGenQueue.task_done()


### runtime ###

if __name__ == '__main__':
	ReportGenQueue = Queue(maxsize = 0)
	threading.Thread(target=ReportGenInvoke, daemon=True).start()

	for Endpoint in EndpointList:
		EndpointIntervalList = DataCollect(Endpoint, IntervalList)
		EndpointIntervalListWithData = DataImport(EndpointIntervalList)
		for EndpointIntervalReport in EndpointIntervalListWithData:
			ReportGenQueue.put(EndpointIntervalReport)

	ReportGenQueue.join()

	print('script completed')
	print(time.time() - starttime)

First, we need to import some modules. In this example, we are importing Queue, threading, and Thread. The Thread and threading modules provide us with multi-threading capabilities, while the Queue module provides us with a convenient means of passing tasks between threads.

Next, we’ll define ReportGenQueue in our runtime, which will provide us with the following commands:

  • .put places an item in a queue
  • .get pulls an item from a queue
  • .task_done signals that we are finished using the data last received from .get
  • .join prevents our script from prematurely ending due to a race condition by blocking progression of the main thread until our tally of .task_done iterations matches that of .put.

Next, we’ll define our threading.Thread command and its target function ReportGenInvoke. The threading.Thread command spawns a worker daemon in a separate thread which targets ReportGenInvoke, effectively allowing it to persist and run independently from the main thread, continuously checking and fetching items from the queue using .get, invoking ReportGen when any new data appears, and invoking .task_done once ReportGen is finished running.

Finally, we have two other small changes to our runtime. This includes replacing the line used to invoke ReportGen to now include a .put command to add items to the queue, as well as the addition of a .join command to tie everything together before ending the script.

Executing this script returns:

...
script completed
59.413721799850464

By using a separate thread to handle all invocations of our ReportGen function, we now see an 11-second improvement over our single-threaded script.

How about three threads? Let’s see what having a dedicated thread for our DataImport function does for our total execution time:

...
from queue import Queue
import threading
from threading import Thread

def DataImportInvoke():
	while True:
		EndpointIntervalList = DataImportQueue.get()
		EndpointIntervalListWithData = DataImport(EndpointIntervalList)
		for EndpointIntervalReport in EndpointIntervalListWithData:
			ReportGenQueue.put(EndpointIntervalReport)
		DataImportQueue.task_done()

def ReportGenInvoke():
	while True:
		ReportGen(ReportGenQueue.get())
		ReportGenQueue.task_done()


### runtime ###

if __name__ == '__main__':
	DataImportQueue = Queue(maxsize = 0)
	ReportGenQueue = Queue(maxsize = 0)
	threading.Thread(target=DataImportInvoke, daemon=True).start()
	threading.Thread(target=ReportGenInvoke, daemon=True).start()

	for Endpoint in EndpointList:
		EndpointIntervalList = DataCollect(Endpoint, IntervalList)
		DataImportQueue.put(EndpointIntervalList)

	DataImportQueue.join()
	ReportGenQueue.join()

	print('script completed')
	print(time.time() - starttime)

In the above snippet, the steps taken for ReportGen have now been applied toward DataImport. This includes an additional queue, daemon, and function. Our data effectively passes from DataImportQueue directly to ReportGenQueue. Some components from our runtime have been shifted to our new function; all that our runtime needs to do now is invoke DataCollect for each Endpoint and pass the output to DataImportQueue; our second and third threads handle the rest in the background and the script terminates once the corresponding .join statements are fulfilled.

Now that we have three threads configured, what type of performance can we expect?

…
script completed
59.411784410476685

In this case, our performance gain was effectively zero. This isn’t completely unexpected as our DataImport function takes less time to complete than any of our other functions and does not present enough of a bottleneck in this context for us to realize any improvement to the script as a whole. We will circle back to this three-threaded example toward the end of our exercise.

Now, what about our DataCollect function? We won’t actually see any performance improvement at all adding a fourth thread, because at this point DataCollect is the only function being executed in our primary thread.

Since we have exhausted our options that involve parallelizing our functions with each other, our next step will be running our ReportGen function in-parallel with itself.

Threads versus Processes

Up-until now, we have been using a multi-threading technique to run functions concurrently. Our next example leverages multi-processing.

While multi-threading and multi-processing both seek to accomplish the same end-goal, they go about achieving that goal in very different ways. Generally speaking, in Python, multi-processing is best suited for CPU-bound functions, while multi-threading is best suited for functions that are not CPU-bound. Because discussions concerning the differences between processes and threads can get a bit esoteric, my recommendation for anybody looking to better-parallelize their project would be to research and test both approaches to determine which technique provides the best result.

In our model, since ReportGen represents a CPU-bound function, we will be using multi-processing techniques to run ReportGen concurrently with itself. While our choice doesn’t make any real difference within our model, the function that ReportGen is modeled after does scale and perform better when spread across processes rather than threads, with the acceptable expense of greater memory overhead.

Running the same function concurrently with itself via Multiprocessing

A very simple drop-in multiprocessing solution for our function can be achieved with a worker pool. Let’s modify our two-threaded example to incorporate this:

...
from queue import Queue
import threading
from threading import Thread
from multiprocessing import Pool

def ReportGenInvoke():
	while True:
		EndpointIntervalListWithData = ReportGenQueue.get()
		with Pool(cpucount) as p:
			p.map(ReportGen, EndpointIntervalListWithData)
		ReportGenQueue.task_done()


### runtime ###

if __name__ == '__main__':
	cpucount = 1
	ReportGenQueue = Queue(maxsize = 0)	
	threading.Thread(target=ReportGenInvoke, daemon=True).start()

	for Endpoint in EndpointList:
		EndpointIntervalList = DataCollect(Endpoint, IntervalList)
		EndpointIntervalListWithData = DataImport(EndpointIntervalList)
		ReportGenQueue.put(EndpointIntervalListWithData)

	ReportGenQueue.join()

	print('script completed')
	print(time.time() - starttime)

We start by importing the Pool module, defining a cpucount variable in our runtime, and modifying our ReportGenInvoke function to use a worker Pool instead of our original for loop. This effectively allows our iterable list EndpointIntervalListWithData to be passed to a pool of processes, with the number of concurrent processes for each invocation limited by the cpucount variable.

Let’s see how this script performs with different cpucount values:

cpucount = 1
...
script completed
59.5568...
cpucount = 2
...
script completed
41.5192...
cpucount = 3
...
script completed
32.5154...
cpucount = 4
...
script completed
32.5190...

… Our script is now running over twice as fast as our original script, with performance tapping-out at 3 cores.

Our worker pool of processes seems great, right? Unfortunately, there are a few adverse effects to consider with this approach:

  • A pool of 3 workers is spawned each time ReportGenInvoke pulls data from its queue. While this does not pose a problem with our current Endpoint and Interval variables, if we were to make any changes to our ReportGen function that caused it to take longer to process, we could end-up with more than 3 concurrently-running ReportGen processes. In a worst-case scenario involving our real-world use-case, this added overhead would cause execution to slow-down, causing our running process count to accumulate until we exceed our system’s available RAM, causing the program to crash. This problem is exacerbated if we modify our starting variables to pull data from more endpoints or for more report intervals.
  • Pool workers are often sitting idle. Since ReportGen has a variable execution duration ranging from 3 to 9 seconds, workers who receive shorter tasks spend the rest of their life idling, waiting for the 9-second task to complete.
  • Performance does not scale with any quantity of CPU cores beyond the number of endpoints we are collecting data for, which in our example is 3 cores. Since we are running 9 discrete instances of ReportGen in our example, we should be able to effectively leverage a larger pool of CPU cores to attain a faster result if we have the resources to do so.

So, how do we overcome these problems and implement a multi-processing approach without these adverse effects? This question initially had me stumped for a little bit; after some research and testing, I discovered that there are quite a few different approaches to this type of problem. For my next example, I’ll be using the multiprocessing module, adapting some techniques highlighted in a StackExchange discussion I came across:

...
import multiprocessing as mp

def ReportGenInvoke(ReportGenQueue):
	while True:
		EndpointIntervalReport = ReportGenQueue.get(block=True)
		if EndpointIntervalReport == 'Completed':
			break
		ReportGen(EndpointIntervalReport)


### runtime ###

if __name__ == '__main__':
	cpucount = 1
	ReportGenQueue = mp.Queue()
	ReportGenPool = mp.Pool(cpucount, ReportGenInvoke,(ReportGenQueue,))

	for Endpoint in EndpointList:
		EndpointIntervalList = DataCollect(Endpoint, IntervalList)
		EndpointIntervalListWithData = DataImport(EndpointIntervalList)
		for EndpointIntervalReport in EndpointIntervalListWithData:
			ReportGenQueue.put(EndpointIntervalReport)
		
	for i in range(cpucount):
		ReportGenQueue.put('Completed')

	ReportGenQueue.close()
	ReportGenQueue.join_thread()

	ReportGenPool.close()
	ReportGenPool.join()
	
	print('script completed')
	print(time.time() - starttime)

We start by importing the multiprocessing module, shortened to mp. We can see in our runtime that we are using the versions of Queue and pool provided by multiprocessing to handle ReportGenQueue and ReportGenInvoke in-order to capitalize on some features not present in the standard version of queue.

Next, we have completely redefined our ReportGenInvoke function. Rather than iterating over this function in a separate thread via a daemon, we have instead spawned a worker pool using mp.pool within our runtime, with the quantity of workers in the pool being defined by our cpucount variable. Once this pool is initialized in our runtime, the workers reach the .get command… and patiently wait for the queue to become populated with data before executing ReportGen with the data.

With the process pools used in the earlier example, worker processes automatically terminate once the final worker in the pool has completed a task. In our new example, our worker processes will persist unless they’re explicitly told to stop. While this isn’t a problem in the narrow context of our model, if we intend to use this model as modular part of a larger program, we would not want any processes persisting any longer than necessary. This problem is solved by using a “poison pill”; in our ReportGenInvoke function, there is a check to confirm that the .get command did not return a value ‘Complete’ before executing ReportGen and returning to the top to wait for another item in the queue. If the worker does receive the ‘Complete’ string, during the following check, the process will break and terminate.

We do not want this poison pill to enter the queue until we know that the ReportGen queue has obtained all of the data it needs to process. We also want to make sure that we send as many poison pills as there are running workers. This first requirement is fulfilled by virtue of the order of operations within our runtime; the last valid piece of report data put into ReportGenQueue enters the queue before our poison pills are generated. Our second requirement is achieved by releasing as many ‘Complete’ poison pills into ReportGenQueue as our cpucount indicates, which will match our number of workers.

Finally, the join, join_thread, and close commands gracefully terminate our ReportGen Queue and Pool before closing the script and returning the execution time.

So, what type of performance do we get with this script? Let’s start with 3 cores:

cpucount = 3
...
script completed
28.2939...
cpucount = 4
...
script completed
25.8674...
cpucount = 5
...
script completed
25.2942...
cpucount = 6
...
script completed
25.3038

As we can see, unlike our previous pool, not only do we have better performance with 3 cores, we continue to see performance gains with the addition of extra cores up-until a core count of 5.

We mentioned earlier that we would circle back to our third, DataImportInvoke thread… Let’s see how our script runs with this component re-incorporated into our script:

...
from queue import Queue
import threading
from threading import Thread
import multiprocessing as mp

def DataImportInvoke():
	while True:
		EndpointIntervalList = DataImportQueue.get()
		EndpointIntervalListWithData = DataImport(EndpointIntervalList)
		for EndpointIntervalReport in EndpointIntervalListWithData:
			ReportGenQueue.put(EndpointIntervalReport)
		DataImportQueue.task_done()

def ReportGenInvoke(ReportGenQueue):
	while True:
		EndpointIntervalReport = ReportGenQueue.get(block=True)
		if EndpointIntervalReport == 'Completed':
			break
		ReportGen(EndpointIntervalReport)


### runtime ###

if __name__ == '__main__':
	cpucount = 1
	DataImportQueue = Queue(maxsize = 0)
	threading.Thread(target=DataImportInvoke, daemon=True).start()
	ReportGenQueue = mp.Queue()
	ReportGenPool = mp.Pool(cpucount, ReportGenInvoke,(ReportGenQueue,))

	for Endpoint in EndpointList:
		EndpointIntervalList = DataCollect(Endpoint, IntervalList)
		DataImportQueue.put(EndpointIntervalList)
		
	DataImportQueue.join()

	for i in range(cpucount):
		ReportGenQueue.put('Completed')

	ReportGenQueue.close()
	ReportGenQueue.join_thread()

	ReportGenPool.close()
	ReportGenPool.join()
	
	print('script completed')
	print(time.time() - starttime)

In-order for our poison pill to work correctly now, it must be applied after DataImportQueue.join(), as this now demarcates when the last bit data has entered ReportGenQueue.

Executing with five cores returns:

cpucount = 5
...
script completed
23.4934...

Rather than there being no improvement as we experienced earlier, we have now shaved a few seconds off of our total execution time. This is due entirely to the significant improvements made in our script overall by processing ReportGen in-parallel with itself, which now comprises a much shorter proportion of our total runtime. This improvement has revealed a new bottleneck with our DataImport function… which has now been solved via multithreading.

Conclusion

The power of parallelization is evident in the sheer reduction of our execution time. In our case, we saw a reduction from 70 seconds to 23.5 seconds, representing an almost 200% improvement.

Now, what if we need even faster execution? In our next article, we discuss the importance of refactoring in the context of parallelization.

Thank you for reading!