In my previous post, we demonstrated when and how parallelization techniques can used to improve code performance without modiying core functions.
As we seek greater performance in our code, we will run into situations where applying these techniques alone will not yield the result that we want. If better hardware is not available, our only option is to take a hard look at our code and modify our functions so that they can better leverage the resources that we have at our disposal without changing their underlying behavior. This process is part of a greater discipline referred to as code refactoring.
In this article, we will work through all of the problem-solving required to effectively refactor our example code in-order to achieve greater performance through parallelization.
For reference, all of the code listed in this article was executed using a pair of Xeon E5-2640v2 processors, providing us with a total of 16 processing cores.
Table of Contents
- Script Overview
- Limitations of Parallelization Without Refactoring
- ReportGen: Our Big, Smelly Function
- Ensuring Input/Output Consistency
- Searching for Code Smell
- Assessing our Smelly Block for Refactoring Potential
- Refactoring our Code
- Closing Notes
Script Overview
Let’s take a look at the following Python script:
#import necessary modules
import sqlite3
import csv
import glob
from glob import glob
import pandas as pd
import time
import os
import pickle
def ReportGen(Report):
...
print(f'{ReportName} Completed')
### runtime ###
if __name__ == '__main__':
#define output location for our reports
csvoutputdir = 'csvoutput/'
# open pickled data from filesystem that we want to create reports with
with open('ReportGenList.pkl', 'rb') as file:
ReportList = pickle.load(file)
#start execution timer
starttime = time.time()
#process data
for Report in ReportList:
ReportGen(Report)
print('script complete')
print(time.time() - starttime)
I’ll reveal the contents of our ReportGen function later. For now, let’s focus on the overall structure and flow of the script.
After the script defines the output location for our reports, it loads some input data to feed into the ReportGen function loop. For this example, we are loading a static picklefile from our filesystem to provide this input data. Running print(ReportList)
after loading the data reveals the contents:
[['24h', 7, 'dailymarketdatacsvs/', 'WeeklyReport', 360, 86400, 1696291200,
id avgHighPrice highPriceVolume avgLowPrice lowPriceVolume
0 2 176.0 16953582 172.0 8562003
1 6 196979.0 81 187239.0 305
2 8 185920.0 81 182357.0 254
3 10 192134.0 92 185652.0 305
4 12 205941.0 79 197059.0 325
... ... ... ... ... ...
3612 28313 69426332.0 207 68557114.0 190
3613 28316 106373441.0 161 105383220.0 190
3614 28334 1662081.0 2111 1645718.0 4323
3615 28338 188022810.0 159 186432721.0 156
3616 28583 127016.0 88 95722.0 40
[25451 rows x 5 columns]],
['24h', 30, 'dailymarketdatacsvs/', 'MonthlyReport', 360, 86400, 1696291200,
id avgHighPrice highPriceVolume avgLowPrice lowPriceVolume
0 2 176.0 16953582 172.0 8562003
1 6 196979.0 81 187239.0 305
2 8 185920.0 81 182357.0 254
3 10 192134.0 92 185652.0 305
4 12 205941.0 79 197059.0 325
... ... ... ... ... ...
3610 28310 122027947.0 110 120378165.0 106
3611 28313 69227320.0 208 68443250.0 205
3612 28316 104550767.0 147 103697835.0 153
3613 28334 2081736.0 2869 2058064.0 5810
3614 28338 263432488.0 122 261193821.0 109
[108967 rows x 5 columns]],
['24h', 360, 'dailymarketdatacsvs/', 'YearlyReport', 360, 86400, 1696291200,
id avgHighPrice highPriceVolume avgLowPrice lowPriceVolume
0 2 1.760000e+02 16953582 1.720000e+02 8562003
1 6 1.969790e+05 81 1.872390e+05 305
2 8 1.859200e+05 81 1.823570e+05 254
3 10 1.921340e+05 92 1.856520e+05 305
4 12 2.059410e+05 79 1.970590e+05 325
... ... ... ... ... ...
3541 27241 1.839330e+08 212 1.803544e+08 203
3542 27269 8.820648e+06 30 8.148511e+06 17
3543 27272 2.180000e+03 8248 2.010000e+03 97244
3544 27277 1.491448e+09 273 1.486893e+09 316
3545 27355 4.814388e+08 124 4.737290e+08 58
[1293598 rows x 5 columns]]]
I’ve added some line breaks for clarity; our input contains 3 lists of identically-structured data, with each list containing all of the ingredients needed by our ReportGen function to generate a report.
Next, after starting a timer, the ReportGen function is executed for each list contained in ReportList. For ReportGen to successfully execute, a report name and a corresponding Pandas dataframe are required. In our example, our report names include WeeklyReport, MonthlyReport, and YearlyReport, with each corresponding Pandas dataframe containing a proportional quantity of rows; the dataframe for WeeklyReport contains data points spanning over the past week, which amounts to about 25,000 rows of data, while YearlyReport contains data points spanning over the past year, which amounts to about 1.3 million rows of data.
Finally, once all iterations of ReportGen are complete, our execution time is displayed and the script terminates. Running the above script returns:
...
WeeklyReport Completed
MonthlyReport Completed
YearlyReport Completed
script complete
46.227...
A truncated example of a generated report looks like this:
YearlyGen.csv:
'id','MedianLow','MedianHigh','MedianVolumeLow','MedianVolumeHigh','MeanLow','MeanHigh','MeanVolumeLow','MeanVolumeHigh','MinLow','MinHigh','MaxLow','MaxHigh'
2,160,164,8110064.5,20439520,160.69444444444446,164.85555555555555,7918714.452777778,20312851.71111111,145,149,186,190
6,185119,191536,348,108,185699.44568245125,193601.6061452514,351.8050139275766,112.23955431754875,161887,172579,215196,272433
8,184293,188319,331,94,184653.94428969358,188819.45961002784,329.6824512534819,97.42061281337047,172696,177184,206864,229456
10,184483,191122.5,350,102,186137.86908077996,193321.74581005587,348.71866295264624,108.07799442896936,169877,178063,438581,337600
12,191940,199444,338,98,192673.0974930362,201241.76536312848,340.1058495821727,100.37883008356546,174504,183494,297609,282826
28,395.5,436,49,27,523.45197740113,2111.3229461756373,55.325905292479106,34.05292479108635,18,1,9500,132063
...
[3898 rows x 13 columns]
Overall, our structure is very simple; the script iterates over a static input and provides a sequence of corresponding outputs, all leveraging a single function.
Limitations of Parallelization Without Refactoring
Without making any modifications to our still-unknown ReportGen function, we can improve our execution time by incorporating some parallelization techniques which we learned in our previous article. Since our reports are mutually distinct, and since we have access to up-to 16 processing cores, we should execute the ReportGen function concurrenty with itself. Let’s modify our runtime to incorporate a simple worker pool to achieve this. Changes are outlined in bold:
...
### runtime ###
if __name__ == '__main__':
csvoutputdir = 'csvoutput/'
with open('ReportGenList.pkl', 'rb') as file:
ReportList = pickle.load(file)
starttime = time.time()
cpucount = 2
import multiprocessing as mp
with mp.Pool(cpucount) as pool:
pool.map(ReportGen, ReportList)
print('script complete')
print(time.time() - starttime)
Executing our modified script with 1, 2, 3, and 4 cores respectively reveal our runtime:
... cpucount = 1
... 46.728...
... cpucount = 2
... 34.574...
... cpucount = 3
... 24.634...
... cpucount = 4
... 24.336
While this reduction in time is not insignificant, we can clearly see that our performance is capped at about 3 cores. This makes sense given that this is equal to the number of reports that we are generating; any cores beyond 3 are going to be sitting idle. Furthermore, cores that have been assigned to process the Weekly and Monthly reports will likely be sitting idle while the much larger YearlyReport continues being processed by a single core.
What if we want better performance? Without obtaining better hardware, at this point our only option is to turn our focus toward the ReportGen function.
ReportGen: Our Big, Smelly Function
Let’s take a long look at ReportGen, focusing on our comments in bold:
def ReportGen(Report):
#extract our ReportName and the PandasDB needed for report generation
ReportName = Report[3]
PandasDB = Report[7]
#create our temporary sqlite database in-memory, define cursor
MasterDB = sqlite3.connect(":memory:")
cur = MasterDB.cursor()
#import our PandasDB into a new sqlite table
cur.executescript(f'''CREATE TABLE MasterTable('id' INT, 'avgHighPrice' INT,'highPriceVolume' INT,'avgLowPrice' INT,'lowPriceVolume' INT);''')
PandasDB.to_sql(f'MasterTable', MasterDB, if_exists="append", index=False)
#index the sqlite table
cur.execute(f'''CREATE INDEX indices ON MasterTable(id);''')
#create temp output table where all of our output results will be appended
cur.executescript('''CREATE TEMP TABLE FinalOutput('id' INT, 'MedianLow' INT,'MedianHigh' INT,'MedianVolumeLow' INT,'MedianVolumeHigh' INT,'MeanLow' INT,'MeanHigh' INT,'MeanVolumeLow' INT,'MeanVolumeHigh' INT,'MinLow' INT,'MinHigh' INT,'MaxLow' INT,'MaxHigh' INT);''')
#get unique id list
cur.execute('''SELECT DISTINCT id FROM MasterTable''')
currentids = cur.fetchall()
#iterate over each unique ID, perform arithmetic operations and append the result to FinalOutput table
for currentid in currentids:
cur.execute(f'''CREATE TEMP TABLE currentid AS SELECT * FROM MasterTable WHERE id = {currentid[0]}''')
#create separate tables for each value where null values have either been eliminated for price, or set to 0 for volume, so that we will get accurate median/mean/min/max figures later-on.
cur.executescript('''CREATE TEMP TABLE AvgLowPriceFixNull AS SELECT avgLowPrice FROM currentid WHERE avgLowPrice != ''; CREATE TEMP TABLE AvgHighPriceFixNull AS SELECT avgHighPrice FROM currentid WHERE avgHighPrice != ''; CREATE TEMP TABLE LowPriceVolumeFixNull AS SELECT lowPriceVolume FROM currentid; UPDATE LowPriceVolumeFixNull SET lowPriceVolume = 0 WHERE lowPriceVolume = ''; CREATE TEMP TABLE HighPriceVolumeFixNull AS SELECT highPriceVolume FROM currentid; UPDATE HighPriceVolumeFixNull SET highPriceVolume = 0 WHERE highPriceVolume = '';''')
#median calculations (this is very ugly because sqlite does not have a native median function)
cur.executescript('''CREATE TEMP TABLE MedianLowTable AS SELECT AVG(avgLowPrice) AS MedianLow FROM (SELECT avgLowPrice FROM AvgLowPriceFixNull ORDER BY avgLowPrice LIMIT 2 - (SELECT COUNT(*) FROM AvgLowPriceFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM AvgLowPriceFixNull)); CREATE TEMP TABLE MedianHighTable AS SELECT AVG(avgHighPrice) AS MedianHigh FROM (SELECT avgHighPrice FROM AvgHighPriceFixNull ORDER BY avgHighPrice LIMIT 2 - (SELECT COUNT(*) FROM AvgHighPriceFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM AvgHighPriceFixNull)); CREATE TEMP TABLE MedianLowVolumeTable AS SELECT AVG(lowPriceVolume) AS MedianVolumeLow FROM (SELECT lowPriceVolume FROM LowPriceVolumeFixNull ORDER BY lowPriceVolume LIMIT 2 - (SELECT COUNT(*) FROM LowPriceVolumeFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM LowPriceVolumeFixNull)); CREATE TEMP TABLE MedianHighVolumeTable AS SELECT AVG(highPriceVolume) AS MedianVolumeHigh FROM (SELECT highPriceVolume FROM HighPriceVolumeFixNull ORDER BY highPriceVolume LIMIT 2 - (SELECT COUNT(*) FROM HighPriceVolumeFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM HighPriceVolumeFixNull));''')
#mean calculations
cur.executescript('''CREATE TEMP TABLE MeanLowTable AS SELECT AVG(avgLowPrice) AS MeanLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MeanHighTable AS SELECT AVG(avgHighPrice) AS MeanHigh FROM AvgHighPriceFixNull; CREATE TEMP TABLE MeanLowVolumeTable AS SELECT AVG(lowPriceVolume) AS MeanVolumeLow FROM LowPriceVolumeFixNull; CREATE TEMP TABLE MeanHighVolumeTable AS SELECT AVG(highPriceVolume) AS MeanVolumeHigh FROM HighPriceVolumeFixNull;''')
#minmax calculations
cur.executescript('''CREATE TEMP TABLE MinLowPriceTable AS SELECT min(avgLowPrice) AS MinLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MinHighPriceTable AS SELECT min(avgHighPrice) AS MinHigh FROM AvgHighPriceFixNull; CREATE TEMP TABLE MaxLowPriceTable AS SELECT max(avgLowPrice) AS MaxLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MaxHighPriceTable AS SELECT max(avgHighPrice) AS MaxHigh FROM AvgHighPriceFixNull;''')
#insert all of the values for the current id into the FinalOutput table
cur.execute('''INSERT INTO FinalOutput SELECT DISTINCT currentid.id AS id, MedianLow, MedianHigh, MedianVolumeLow, MedianVolumeHigh, MeanLow, MeanHigh, MeanVolumeLow, MeanVolumeHigh, MinLow, MinHigh, MaxLow, MaxHigh FROM currentid CROSS JOIN MedianLowTable CROSS JOIN MedianHighTable CROSS JOIN MedianLowVolumeTable CROSS JOIN MedianHighVolumeTable CROSS JOIN MeanLowTable CROSS JOIN MeanHighTable CROSS JOIN MeanLowVolumeTable CROSS JOIN MeanHighVolumeTable CROSS JOIN MinLowPriceTable CROSS JOIN MinHighPriceTable CROSS JOIN MaxLowPriceTable CROSS JOIN MaxHighPriceTable;''')
#cleanup - purge temp tables before proceeding to next id
cur.executescript('''DROP TABLE IF EXISTS currentid; DROP TABLE IF EXISTS AvgLowPriceFixNull; DROP TABLE IF EXISTS AvgHighPriceFixNull; DROP TABLE IF EXISTS LowPriceVolumeFixNull; DROP TABLE IF EXISTS HighPriceVolumeFixNull; DROP TABLE IF EXISTS MedianLowTable; DROP TABLE IF EXISTS MedianHighTable; DROP TABLE IF EXISTS MedianLowVolumeTable; DROP TABLE IF EXISTS MedianHighVolumeTable; DROP TABLE IF EXISTS MeanLowTable; DROP TABLE IF EXISTS MeanHighTable; DROP TABLE IF EXISTS MeanLowVolumeTable; DROP TABLE IF EXISTS MeanHighVolumeTable; DROP TABLE IF EXISTS MinLowPriceTable; DROP TABLE IF EXISTS MinHighPriceTable; DROP TABLE IF EXISTS MaxLowPriceTable; DROP TABLE IF EXISTS MaxHighPriceTable;''')
#extract our results
cur.execute('''SELECT * FROM FinalOutput''')
rows = cur.fetchall()
#save the results to a csv file
with open(f'{csvoutputdir}{ReportName}.csv', 'w', newline='') as f:
writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC, quotechar="'")
# write headers to output
writer.writerow([i[0] for i in cur.description])
# write result rows to output
writer.writerows(rows)
#cleanup - purge FinalOutput and MasterTable before proceeding to next Report
cur.executescript('''DROP TABLE IF EXISTS FinalOutput; DROP TABLE IF EXISTS MasterTable;''')
print(f'{ReportName} Completed')
As you can probably tell, our deceptively simple script was hiding a relatively chunky function. To summarize what ReportGen does:
- A Pandas dataframe loaded from our picklefile is dumped into a temporary SQLite database
- A long list of calculations for each unique ID contained in the database is executed, with the results being appended to a new table of data
- Our new table of data is exported to our filesystem as a .csv file with the given ReportName
Ensuring Input/Output Consistency
So… how do we restructure this function to achieve better performance? Before getting into what we can change, let’s define what we cannot change:
- Our script must maintain support for any number of unique input values sharing the same structure as our test data.
- While our ReportList variable in our example contains static data (in this case, a picklefile loaded from our filesystem), our script should be able to accept and interpret any quantity of lists, each containing any unique data, so long as the list placement of the Report Name and the Pandas Database with associated headers remains consistent.
- Likewise, for each unique input, our output data (in this case, the .csv reports being generated) must remain the same.
These restrictions must remain in the back of our mind at all times. When refactoring, we want to improve our code without modifying the core behavior.
Searching for Code Smell
So, let’s start combing through the function, comment-by-comment to see if we can identify anything we might want to change:
#extract our ReportName and the PandasDB needed for report generation
ReportName = Report[3]
PandasDB = Report[7]
Since our ReportName and our PandasDB are the two key defining components of our report, we probably don’t want to make any changes here. Let’s move onto the next section:
#create our temporary sqlite database in-memory, define cursor
MasterDB = sqlite3.connect(":memory:")
cur = MasterDB.cursor()
#import our PandasDB into a new sqlite table
cur.executescript(f'''CREATE TABLE MasterTable('id' INT, 'avgHighPrice' INT,'highPriceVolume' INT,'avgLowPrice' INT,'lowPriceVolume' INT);''')
PandasDB.to_sql(f'MasterTable', MasterDB, if_exists="append", index=False)
#index the sqlite table
cur.execute(f'''CREATE INDEX indices ON MasterTable(id);''')
This block includes the full list of commands used to convert PandasDB into an indexed SQLite table. Since these steps are prerequisites for subsequent steps, we will only change this if future conditions require it.
#get unique id list
cur.execute('''SELECT DISTINCT id FROM MasterTable''')
currentids = cur.fetchall()
This block executes a SQLite query in-order to obtain a list of unique “id” values contained within our converted SQLite table, with each unique ID being iterated over in the following for loop…
#iterate over each unique ID, perform arithmetic operations and append the result to FinalOutput table
for currentid in currentids:
cur.execute(f'''CREATE TEMP TABLE currentid AS SELECT * FROM MasterTable WHERE id = {currentid[0]}''')
#create separate tables for each value where null values have either been eliminated for price, or set to 0 for volume, so that we will get accurate median/mean/min/max figures later-on.
cur.executescript('''CREATE TEMP TABLE AvgLowPriceFixNull AS SELECT avgLowPrice FROM currentid WHERE avgLowPrice != ''; CREATE TEMP TABLE AvgHighPriceFixNull AS SELECT avgHighPrice FROM currentid WHERE avgHighPrice != ''; CREATE TEMP TABLE LowPriceVolumeFixNull AS SELECT lowPriceVolume FROM currentid; UPDATE LowPriceVolumeFixNull SET lowPriceVolume = 0 WHERE lowPriceVolume = ''; CREATE TEMP TABLE HighPriceVolumeFixNull AS SELECT highPriceVolume FROM currentid; UPDATE HighPriceVolumeFixNull SET highPriceVolume = 0 WHERE highPriceVolume = '';''')
#median calculations (this is very ugly because sqlite does not have a native median function)
cur.executescript('''CREATE TEMP TABLE MedianLowTable AS SELECT AVG(avgLowPrice) AS MedianLow FROM (SELECT avgLowPrice FROM AvgLowPriceFixNull ORDER BY avgLowPrice LIMIT 2 - (SELECT COUNT(*) FROM AvgLowPriceFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM AvgLowPriceFixNull)); CREATE TEMP TABLE MedianHighTable AS SELECT AVG(avgHighPrice) AS MedianHigh FROM (SELECT avgHighPrice FROM AvgHighPriceFixNull ORDER BY avgHighPrice LIMIT 2 - (SELECT COUNT(*) FROM AvgHighPriceFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM AvgHighPriceFixNull)); CREATE TEMP TABLE MedianLowVolumeTable AS SELECT AVG(lowPriceVolume) AS MedianVolumeLow FROM (SELECT lowPriceVolume FROM LowPriceVolumeFixNull ORDER BY lowPriceVolume LIMIT 2 - (SELECT COUNT(*) FROM LowPriceVolumeFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM LowPriceVolumeFixNull)); CREATE TEMP TABLE MedianHighVolumeTable AS SELECT AVG(highPriceVolume) AS MedianVolumeHigh FROM (SELECT highPriceVolume FROM HighPriceVolumeFixNull ORDER BY highPriceVolume LIMIT 2 - (SELECT COUNT(*) FROM HighPriceVolumeFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM HighPriceVolumeFixNull));''')
#mean calculations
cur.executescript('''CREATE TEMP TABLE MeanLowTable AS SELECT AVG(avgLowPrice) AS MeanLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MeanHighTable AS SELECT AVG(avgHighPrice) AS MeanHigh FROM AvgHighPriceFixNull; CREATE TEMP TABLE MeanLowVolumeTable AS SELECT AVG(lowPriceVolume) AS MeanVolumeLow FROM LowPriceVolumeFixNull; CREATE TEMP TABLE MeanHighVolumeTable AS SELECT AVG(highPriceVolume) AS MeanVolumeHigh FROM HighPriceVolumeFixNull;''')
#minmax calculations
cur.executescript('''CREATE TEMP TABLE MinLowPriceTable AS SELECT min(avgLowPrice) AS MinLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MinHighPriceTable AS SELECT min(avgHighPrice) AS MinHigh FROM AvgHighPriceFixNull; CREATE TEMP TABLE MaxLowPriceTable AS SELECT max(avgLowPrice) AS MaxLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MaxHighPriceTable AS SELECT max(avgHighPrice) AS MaxHigh FROM AvgHighPriceFixNull;''')
#insert all of the values for the current id into the FinalOutput table
cur.execute('''INSERT INTO FinalOutput SELECT DISTINCT currentid.id AS id, MedianLow, MedianHigh, MedianVolumeLow, MedianVolumeHigh, MeanLow, MeanHigh, MeanVolumeLow, MeanVolumeHigh, MinLow, MinHigh, MaxLow, MaxHigh FROM currentid CROSS JOIN MedianLowTable CROSS JOIN MedianHighTable CROSS JOIN MedianLowVolumeTable CROSS JOIN MedianHighVolumeTable CROSS JOIN MeanLowTable CROSS JOIN MeanHighTable CROSS JOIN MeanLowVolumeTable CROSS JOIN MeanHighVolumeTable CROSS JOIN MinLowPriceTable CROSS JOIN MinHighPriceTable CROSS JOIN MaxLowPriceTable CROSS JOIN MaxHighPriceTable;''')
#cleanup - purge temp tables before proceeding to next id
cur.executescript('''DROP TABLE IF EXISTS currentid; DROP TABLE IF EXISTS AvgLowPriceFixNull; DROP TABLE IF EXISTS AvgHighPriceFixNull; DROP TABLE IF EXISTS LowPriceVolumeFixNull; DROP TABLE IF EXISTS HighPriceVolumeFixNull; DROP TABLE IF EXISTS MedianLowTable; DROP TABLE IF EXISTS MedianHighTable; DROP TABLE IF EXISTS MedianLowVolumeTable; DROP TABLE IF EXISTS MedianHighVolumeTable; DROP TABLE IF EXISTS MeanLowTable; DROP TABLE IF EXISTS MeanHighTable; DROP TABLE IF EXISTS MeanLowVolumeTable; DROP TABLE IF EXISTS MeanHighVolumeTable; DROP TABLE IF EXISTS MinLowPriceTable; DROP TABLE IF EXISTS MinHighPriceTable; DROP TABLE IF EXISTS MaxLowPriceTable; DROP TABLE IF EXISTS MaxHighPriceTable;''')
… And here, we have our first example of something potentially worth refactoring:
- This block is computationally expensive; there are a lot of arithmetic operations contained within our SQLite queries.
- This is the only block of code within the ReportGen function containing a loop. Since a loop is present, we can infer that this block is potentially executed more than once for each iteration of ReportGen. Conversely, all other commands outside of this loop are only executed once per iteration of ReportGen.
So, let’s place a red flag here and come back once we’ve reviewed the rest of our code.
#extract our results
cur.execute('''SELECT * FROM FinalOutput''')
rows = cur.fetchall()
Once our for loop is finished, this block exports the contents of our completed report into the variable “rows”…
#save the results to a csv file
with open(f'{csvoutputdir}{ReportName}.csv', 'w', newline='') as f:
writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC, quotechar="'")
# write headers to output
writer.writerow([i[0] for i in cur.description])
# write result rows to output
writer.writerows(rows)
#cleanup - purge FinalOutput and MasterTable before proceeding to next Report
cur.executescript('''DROP TABLE IF EXISTS FinalOutput; DROP TABLE IF EXISTS MasterTable;''')
print(f'{ReportName} Completed')
… which we write along with the report headers to a .csv file, before our SQLite table is purged. While there doesn’t seem to be any immediate need to refactor these final pieces of our function, depending on how we tackle our flagged portion, we may need to come back to make some changes here.
So, in-summary, we now know what blocks we will be leaving alone, which blocks might require changes, and which blocks we probably need to change within our function.
Assessing our Smelly Block for Refactoring Potential
Let’s go ahead and return to the portion that we flagged. Rather than jump-in and immediately start making changes, we should assess if changing this function could actually add value to our script and is worth our time. We can determine this with some data capture and code review, with the goal of answering the following three questions:
- Is a substantial proportion of our runtime consumed by this block?
- Are there a substantial number of loop iterations performed by this block each time the function is called?
- Could each iteration of the loop potentially run independently from other iterations of the same loop?
Our first question can be answered by adding a timer before and after the block, similar to the timer used to measure total execution time. Placing computetime = time.time()
before and print(time.time() - computetime)
after the loop within our serially-executed code returns the following result:
...
10.202
WeeklyReport Completed
10.968...
MonthlyReport Completed
17.140...
YearlyReport Completed
script complete
44.667...
Out of 44.667 seconds, this single loop consumes about 38.31 seconds, or approximately 85% of our total runtime. Since this loop comprises the majority of our total runtime, if we can refactor this section effectively, there is a substantial potential for performance improvement. This fulfills our first question, confirming that this loop, prior to anything else, should be the focal point of a refactoring attempt.
Moving onto our second question, we can easily check our quantity of loop iterations by printing the output of len(currentids
) after currentids has been defined. This returns:
...
3817
WeeklyReport Completed
3879
MonthlyReport Completed
3898
YearlyReport Completed
...
We can see that at a minimum, with the test data that we are using, the loop is iterated over 3817 times. As a result, this portion of our function could theoretically be scaled to run on a quantity of separate threads or processes approaching that amount. With this information, our second question has been fulfilled; there are a substantial amount of loop iterations performed each time the function is called, which provides us with a large window for effective parallelization, justifying our desire to refactor.
Finally, let’s move onto our third and most difficult question: Can each iteration of our loop (theoretically) run independently from each other? Determining this in our case will require taking a closer look at the contents of the loop and identifying the scope of data that is being manipulated with each iteration.
Let’s start with our first SQLite query after the loop has started:
cur.execute(f'''CREATE TEMP TABLE currentid AS SELECT * FROM MasterTable WHERE id = {currentid[0]}''')
Before the loop runs any calculations, a table is created from our MasterTable, filtering the “id” column by the current loop iteration of “currentid”. Since every id defined within the “currentids” iterable is unique, we know that every instance of this query will yield a unique result independent from all other iterations of the loop. So far, so good.
Next, lets move onto the next comment:
#create separate tables for each value where null values have either been eliminated for price, or set to 0 for volume, so that we will get accurate median/mean/min/max figures later-on.
cur.executescript('''CREATE TEMP TABLE AvgLowPriceFixNull AS SELECT avgLowPrice FROM currentid WHERE avgLowPrice != ''; CREATE TEMP TABLE AvgHighPriceFixNull AS SELECT avgHighPrice FROM currentid WHERE avgHighPrice != ''; CREATE TEMP TABLE LowPriceVolumeFixNull AS SELECT lowPriceVolume FROM currentid; UPDATE LowPriceVolumeFixNull SET lowPriceVolume = 0 WHERE lowPriceVolume = ''; CREATE TEMP TABLE HighPriceVolumeFixNull AS SELECT highPriceVolume FROM currentid; UPDATE HighPriceVolumeFixNull SET highPriceVolume = 0 WHERE highPriceVolume = '';''')
We don’t need to get into the nitty-gritty of each SQLite query at this stage; we just need to validate what tables are being created, and what data the creation commands are referencing. This can be done by checking every FROM and CREATE command to check the origin and destination of all the data that the SQLite scripts are manipulating. In this block, every single FROM command references the currentid table, while our CREATE commands introduce four new tables ending in “FixNull”. Since we have not introduced any data derived from any other instance of our loop up-until this point, everything is looking good so far.
Moving onto our next block…
#median calculations (this is very ugly because sqlite does not have a native median function)
cur.executescript('''CREATE TEMP TABLE MedianLowTable AS SELECT AVG(avgLowPrice) AS MedianLow FROM (SELECT avgLowPrice FROM AvgLowPriceFixNull ORDER BY avgLowPrice LIMIT 2 - (SELECT COUNT(*) FROM AvgLowPriceFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM AvgLowPriceFixNull)); CREATE TEMP TABLE MedianHighTable AS SELECT AVG(avgHighPrice) AS MedianHigh FROM (SELECT avgHighPrice FROM AvgHighPriceFixNull ORDER BY avgHighPrice LIMIT 2 - (SELECT COUNT(*) FROM AvgHighPriceFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM AvgHighPriceFixNull)); CREATE TEMP TABLE MedianLowVolumeTable AS SELECT AVG(lowPriceVolume) AS MedianVolumeLow FROM (SELECT lowPriceVolume FROM LowPriceVolumeFixNull ORDER BY lowPriceVolume LIMIT 2 - (SELECT COUNT(*) FROM LowPriceVolumeFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM LowPriceVolumeFixNull)); CREATE TEMP TABLE MedianHighVolumeTable AS SELECT AVG(highPriceVolume) AS MedianVolumeHigh FROM (SELECT highPriceVolume FROM HighPriceVolumeFixNull ORDER BY highPriceVolume LIMIT 2 - (SELECT COUNT(*) FROM HighPriceVolumeFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM HighPriceVolumeFixNull));''')
#mean calculations
cur.executescript('''CREATE TEMP TABLE MeanLowTable AS SELECT AVG(avgLowPrice) AS MeanLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MeanHighTable AS SELECT AVG(avgHighPrice) AS MeanHigh FROM AvgHighPriceFixNull; CREATE TEMP TABLE MeanLowVolumeTable AS SELECT AVG(lowPriceVolume) AS MeanVolumeLow FROM LowPriceVolumeFixNull; CREATE TEMP TABLE MeanHighVolumeTable AS SELECT AVG(highPriceVolume) AS MeanVolumeHigh FROM HighPriceVolumeFixNull;''')
#minmax calculations
cur.executescript('''CREATE TEMP TABLE MinLowPriceTable AS SELECT min(avgLowPrice) AS MinLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MinHighPriceTable AS SELECT min(avgHighPrice) AS MinHigh FROM AvgHighPriceFixNull; CREATE TEMP TABLE MaxLowPriceTable AS SELECT max(avgLowPrice) AS MaxLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MaxHighPriceTable AS SELECT max(avgHighPrice) AS MaxHigh FROM AvgHighPriceFixNull;''')
… We again want to follow the same steps taken for the last block; if the FROM commands within this block are only referencing the “currentid” table, or its four “FixNull” derivations, we can infer that all new tables derived from these commands will still continue to operate without regard to the results of other loop iterations. In this circumstance, we can see that every single FROM command either references a “FixNull” table directly, or in the case of our Median queries, references our FixNull tables by way of a function. Again; so far so good. All of the actions performed by our for loop up-until this point are unique to the currentid currently being iterated and there is no intermingling of data between any of our loop iterations.
Now, onto the last section of the loop:
#insert all of the values for the current id into the FinalOutput table
cur.execute('''INSERT INTO FinalOutput SELECT DISTINCT currentid.id AS id, MedianLow, MedianHigh, MedianVolumeLow, MedianVolumeHigh, MeanLow, MeanHigh, MeanVolumeLow, MeanVolumeHigh, MinLow, MinHigh, MaxLow, MaxHigh FROM currentid CROSS JOIN MedianLowTable CROSS JOIN MedianHighTable CROSS JOIN MedianLowVolumeTable CROSS JOIN MedianHighVolumeTable CROSS JOIN MeanLowTable CROSS JOIN MeanHighTable CROSS JOIN MeanLowVolumeTable CROSS JOIN MeanHighVolumeTable CROSS JOIN MinLowPriceTable CROSS JOIN MinHighPriceTable CROSS JOIN MaxLowPriceTable CROSS JOIN MaxHighPriceTable;''')
#cleanup - purge temp tables before proceeding to next id
cur.executescript('''DROP TABLE IF EXISTS currentid; DROP TABLE IF EXISTS AvgLowPriceFixNull; DROP TABLE IF EXISTS AvgHighPriceFixNull; DROP TABLE IF EXISTS LowPriceVolumeFixNull; DROP TABLE IF EXISTS HighPriceVolumeFixNull; DROP TABLE IF EXISTS MedianLowTable; DROP TABLE IF EXISTS MedianHighTable; DROP TABLE IF EXISTS MedianLowVolumeTable; DROP TABLE IF EXISTS MedianHighVolumeTable; DROP TABLE IF EXISTS MeanLowTable; DROP TABLE IF EXISTS MeanHighTable; DROP TABLE IF EXISTS MeanLowVolumeTable; DROP TABLE IF EXISTS MeanHighVolumeTable; DROP TABLE IF EXISTS MinLowPriceTable; DROP TABLE IF EXISTS MinHighPriceTable; DROP TABLE IF EXISTS MaxLowPriceTable; DROP TABLE IF EXISTS MaxHighPriceTable;''')
This block introduces the very first resource that is shared between our loop iterations: the FinalOutput table.
Our first command takes all of the filtered and manipulated data derived from the currentid table and appends it to a single new row within FinalOutput. The second command drops all of the tables created within this loop iteration and completes some housekeeping before the next iteration begins. FinalOutput however, is never dropped; it persists between all iterations of the loop, for every distinct currentid value.
Fortunately in the context of our desire to refactor, the FinalOutput table only serves as a dumping ground for report results; the only SQLite command executed against FinalOutput within our loop is “INSERT INTO” and there are no functions that reference FinalOutput using a FROM command. Because of this, the contents of the loop, while now residing on a shared table, are still indepedently derived.
With all of this review, we have finally answered our third question: Since the results of every loop iteration are mutually-distinct, they can be independently executed
Refactoring our Code
We have identified what needs to be refactored and have confirmed that our refactoring hopefully shouldn’t be a huge waste of time. What do we do next?
Let’s start by establishing some goals. Our refactored code:
- Must allow faster execution, scaling with the number of processing cores that we have available to use
- Must maintain acceptable performance when executed using a single core
- Must respect input/output consistency, as described in the earlier section Ensuring Input/Output Consistency
Since functions serve as atomic units of parallelization, and since we want our smelly block to work concurrently with itself, let’s start by sticking this block into a new function called ReportGenWorker:
def ReportGen(Report):
#extract items needed to generate a report
ReportName = Report[3]
PandasDB = Report[7]
#create our temporary sqlite database in-memory, define cursor
MasterDB = sqlite3.connect(":memory:")
cur = MasterDB.cursor()
#import our PandasDB into a new sqlite table
cur.executescript(f'''CREATE TABLE MasterTable('id' INT, 'avgHighPrice' INT,'highPriceVolume' INT,'avgLowPrice' INT,'lowPriceVolume' INT);''')
PandasDB.to_sql(f'MasterTable', MasterDB, if_exists="append", index=False)
#index the sqlite table
cur.execute(f'''CREATE INDEX indices ON MasterTable(id);''')
#create temp output table where all of our output results will be appended
cur.executescript('''CREATE TEMP TABLE FinalOutput('id' INT, 'MedianLow' INT,'MedianHigh' INT,'MedianVolumeLow' INT,'MedianVolumeHigh' INT,'MeanLow' INT,'MeanHigh' INT,'MeanVolumeLow' INT,'MeanVolumeHigh' INT,'MinLow' INT,'MinHigh' INT,'MaxLow' INT,'MaxHigh' INT);''')
#get unique id list
cur.execute('''SELECT DISTINCT id FROM MasterTable''')
currentids = cur.fetchall()
#iterate over each unique ID, perform arithmetic operations and append the result to FinalOutput table
for currentid in currentids:
ReportGenWorker(currentid, cur)
#extract our results
cur.execute('''SELECT * FROM FinalOutput''')
rows = cur.fetchall()
#save the results to a csv file
with open(f'{csvoutputdir}{ReportName}.csv', 'w', newline='') as f:
writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC, quotechar="'")
# write headers to output
writer.writerow([i[0] for i in cur.description])
# write result rows to output
writer.writerows(rows)
#cleanup - purge FinalOutput before proceeding to next Report
cur.executescript('''DROP TABLE IF EXISTS FinalOutput; DROP TABLE IF EXISTS MasterTable;''')
print(f'{ReportName} Completed')
def ReportGenWorker(currentid, cur):
cur.execute(f'''CREATE TEMP TABLE currentid AS SELECT * FROM MasterTable WHERE id = {currentid[0]}''')
#create separate tables for each value where null values have either been eliminated for price, or set to 0 for volume, so that we will get accurate median/mean/min/max figures later-on.
cur.executescript('''CREATE TEMP TABLE AvgLowPriceFixNull AS SELECT avgLowPrice FROM currentid WHERE avgLowPrice != ''; CREATE TEMP TABLE AvgHighPriceFixNull AS SELECT avgHighPrice FROM currentid WHERE avgHighPrice != ''; CREATE TEMP TABLE LowPriceVolumeFixNull AS SELECT lowPriceVolume FROM currentid; UPDATE LowPriceVolumeFixNull SET lowPriceVolume = 0 WHERE lowPriceVolume = ''; CREATE TEMP TABLE HighPriceVolumeFixNull AS SELECT highPriceVolume FROM currentid; UPDATE HighPriceVolumeFixNull SET highPriceVolume = 0 WHERE highPriceVolume = '';''')
#median calculations (this is very ugly because sqlite does not have a native median function)
cur.executescript('''CREATE TEMP TABLE MedianLowTable AS SELECT AVG(avgLowPrice) AS MedianLow FROM (SELECT avgLowPrice FROM AvgLowPriceFixNull ORDER BY avgLowPrice LIMIT 2 - (SELECT COUNT(*) FROM AvgLowPriceFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM AvgLowPriceFixNull)); CREATE TEMP TABLE MedianHighTable AS SELECT AVG(avgHighPrice) AS MedianHigh FROM (SELECT avgHighPrice FROM AvgHighPriceFixNull ORDER BY avgHighPrice LIMIT 2 - (SELECT COUNT(*) FROM AvgHighPriceFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM AvgHighPriceFixNull)); CREATE TEMP TABLE MedianLowVolumeTable AS SELECT AVG(lowPriceVolume) AS MedianVolumeLow FROM (SELECT lowPriceVolume FROM LowPriceVolumeFixNull ORDER BY lowPriceVolume LIMIT 2 - (SELECT COUNT(*) FROM LowPriceVolumeFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM LowPriceVolumeFixNull)); CREATE TEMP TABLE MedianHighVolumeTable AS SELECT AVG(highPriceVolume) AS MedianVolumeHigh FROM (SELECT highPriceVolume FROM HighPriceVolumeFixNull ORDER BY highPriceVolume LIMIT 2 - (SELECT COUNT(*) FROM HighPriceVolumeFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM HighPriceVolumeFixNull));''')
#mean calculations
cur.executescript('''CREATE TEMP TABLE MeanLowTable AS SELECT AVG(avgLowPrice) AS MeanLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MeanHighTable AS SELECT AVG(avgHighPrice) AS MeanHigh FROM AvgHighPriceFixNull; CREATE TEMP TABLE MeanLowVolumeTable AS SELECT AVG(lowPriceVolume) AS MeanVolumeLow FROM LowPriceVolumeFixNull; CREATE TEMP TABLE MeanHighVolumeTable AS SELECT AVG(highPriceVolume) AS MeanVolumeHigh FROM HighPriceVolumeFixNull;''')
#minmax calculations
cur.executescript('''CREATE TEMP TABLE MinLowPriceTable AS SELECT min(avgLowPrice) AS MinLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MinHighPriceTable AS SELECT min(avgHighPrice) AS MinHigh FROM AvgHighPriceFixNull; CREATE TEMP TABLE MaxLowPriceTable AS SELECT max(avgLowPrice) AS MaxLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MaxHighPriceTable AS SELECT max(avgHighPrice) AS MaxHigh FROM AvgHighPriceFixNull;''')
#insert all of the values for the current id into the FinalOutput table
cur.execute('''INSERT INTO FinalOutput SELECT DISTINCT currentid.id AS id, MedianLow, MedianHigh, MedianVolumeLow, MedianVolumeHigh, MeanLow, MeanHigh, MeanVolumeLow, MeanVolumeHigh, MinLow, MinHigh, MaxLow, MaxHigh FROM currentid CROSS JOIN MedianLowTable CROSS JOIN MedianHighTable CROSS JOIN MedianLowVolumeTable CROSS JOIN MedianHighVolumeTable CROSS JOIN MeanLowTable CROSS JOIN MeanHighTable CROSS JOIN MeanLowVolumeTable CROSS JOIN MeanHighVolumeTable CROSS JOIN MinLowPriceTable CROSS JOIN MinHighPriceTable CROSS JOIN MaxLowPriceTable CROSS JOIN MaxHighPriceTable;''')
#cleanup - purge temp tables before proceeding to next id
cur.executescript('''DROP TABLE IF EXISTS currentid; DROP TABLE IF EXISTS AvgLowPriceFixNull; DROP TABLE IF EXISTS AvgHighPriceFixNull; DROP TABLE IF EXISTS LowPriceVolumeFixNull; DROP TABLE IF EXISTS HighPriceVolumeFixNull; DROP TABLE IF EXISTS MedianLowTable; DROP TABLE IF EXISTS MedianHighTable; DROP TABLE IF EXISTS MedianLowVolumeTable; DROP TABLE IF EXISTS MedianHighVolumeTable; DROP TABLE IF EXISTS MeanLowTable; DROP TABLE IF EXISTS MeanHighTable; DROP TABLE IF EXISTS MeanLowVolumeTable; DROP TABLE IF EXISTS MeanHighVolumeTable; DROP TABLE IF EXISTS MinLowPriceTable; DROP TABLE IF EXISTS MinHighPriceTable; DROP TABLE IF EXISTS MaxLowPriceTable; DROP TABLE IF EXISTS MaxHighPriceTable;''')
The changes are outlined in bold. In-addition to passing currentid to the worker function, we need to pass our variable cur, as this is the cursor to our SQLite database connection.
No changes are needed anywhere else in our script, and executing this works exactly the same, producing the same output as our original, serially-executed script.
So… What should we change now? Let’s try throwing a pool of processes at our new function. Since we need to pass cur into the function along with each currentid, we’ll start by defining the argument “WorkerData” containing all combinations of both values which we’ll pass into the pool. The changes look like this:
...
import multiprocessing as mp
cpucount = 3
def ReportGen(Report):
...
WorkerData = [(currentid, cur) for currentid in currentids]
mp.Pool(cpucount).map(ReportGenWorker, WorkerData)
...
def ReportGenWorker(currentid, cur):
...
Attempting to execute this results in the error:
TypeError: cannot pickle 'sqlite3.Cursor' object
So, it looks like SQLite cursor/connection objects cannot be directly passed to different processes. At this point, it might help to see what the SQLite documentation has to say about parallelization. Let’s start with SQLite’s official documentation on multithreading, followed by Python’s documentation on its SQLite library.
A cursory reading of this material provides us with some important information. It appears that SQLite does allow threads to share database connections when the threading mode is set to “serialized”. While this sounds promising, there are a few caveats to consider:
- As discussed in our last article, CPU-bound functions will (generally) perform better in parallel processes, rather than in parallel threads.
- This threading mode for SQLite is called “serialized” because that is exactly what it does; it takes all commands received from all connected threads and executes them in a series, which is the complete opposite of the parallel execution that we want to achieve.
As a result of these caveats, we’re going to need to think of a different approach. What are some of the obstacles SQLite has presented, and how might we overcome them?
- SQLite connections cannot be passed to separate processes
- Maybe we can convert the data into a different format that can be passed as an object to a worker process
- SQLite databases do not support true “parallel” operation.
- Maybe we could switch to using a client/server-based relational database capable of handling parallel queries, rather than SQLite.
- …Or maybe we can split the database into smaller, separate databases before running calculations.
While switching to using a relational database would definitely make sense if we wanted to host a persistent server to store our data, this type of transition would probably far exceed the scope of what we are trying to accomplish with our script and introduce unnecessary bloat. We can keep this idea in our back pocket if we need to resort to it; in the meantime, let’s focus on the two remaining ideas.
Let’s start by focusing on data conversion. Since we’ve established that SQLite database connections cannot be passed to worker processes, we will need to convert our data into a format that can be passed-along. This unfortunately appears to be at-odds with the fact that converting data between different formats adds computational complexity to our script, which adds runtime and runs counter to our goals.
Fortunately, our source data is not a SQLite database; it is a Pandas dataframe. What we can try to do instead is attempt to complete the early steps of the ReportGen function using the Pandas dataframe in its native state, passing that result into ReportGenWorker processes before making any conversion to SQLite.
If we return to our code review, the only thing sitting between the block responsible for creating the SQLite database and our for loop is the following block:
#get unique id list
cur.execute('''SELECT DISTINCT id FROM MasterTable''')
currentids = cur.fetchall()
Does Pandas have a similar function? It certainly does:
#get unique id list
currentids=PandasDB['id'].unique().tolist()
If we add this to our ReportGen function, along with a
command, we get the following result:len(currentids
)
...
3817
WeeklyReport Completed
3879
MonthlyReport Completed
3898
YearlyReport Completed
...
… Which precisely matches the result received from the equivalent SQLite query and command. So, we absolutely can narrow the scope of our SQLite queries, replacing them instead with equivalent Pandas dataframe operations.
With some adjustments, we can shift all of our SQLite commands from the beginning of our ReportGen function to the beginning of the ReportGenWorker function…
def ReportGen(Report):
ReportName = Report[3]
PandasDB = Report[7]
currentids=PandasDB['id'].unique().tolist()
ReportResults = ReportGenWorker(currentids, PandasDB)
cur.execute('''SELECT * FROM FinalOutput''')
rows = cur.fetchall()
...
def ReportGenWorker(currentids, PandasDB):
MasterDB = sqlite3.connect(":memory:")
cur = MasterDB.cursor()
cur.executescript(f'''CREATE TABLE MasterTable('id' INT, 'avgHighPrice' INT,'highPriceVolume' INT,'avgLowPrice' INT,'lowPriceVolume' INT);''')
PandasDB.to_sql(f'MasterTable', MasterDB, if_exists="append", index=False)
cur.execute(f'''CREATE INDEX indices ON MasterTable(id);''')
cur.executescript('''CREATE TEMP TABLE FinalOutput('id' INT, 'MedianLow' INT,'MedianHigh' INT,'MedianVolumeLow' INT,'MedianVolumeHigh' INT,'MeanLow' INT,'MeanHigh' INT,'MeanVolumeLow' INT,'MeanVolumeHigh' INT,'MinLow' INT,'MinHigh' INT,'MaxLow' INT,'MaxHigh' INT);''')
for currentid in currentids:
...
cur.execute('''INSERT INTO FinalOutput SELECT DISTINCT currentid.id AS id, MedianLow, MedianHigh, MedianVolumeLow, MedianVolumeHigh, MeanLow, MeanHigh, MeanVolumeLow, MeanVolumeHigh, MinLow, MinHigh, MaxLow, MaxHigh FROM currentid CROSS JOIN MedianLowTable CROSS JOIN MedianHighTable CROSS JOIN MedianLowVolumeTable CROSS JOIN MedianHighVolumeTable CROSS JOIN MeanLowTable CROSS JOIN MeanHighTable CROSS JOIN MeanLowVolumeTable CROSS JOIN MeanHighVolumeTable CROSS JOIN MinLowPriceTable CROSS JOIN MinHighPriceTable CROSS JOIN MaxLowPriceTable CROSS JOIN MaxHighPriceTable;''')
...
… and while our script appears to be churning data, we missed a critical detail and experienced an error:
ReportGen(Report)
...
cur.execute('''SELECT * FROM FinalOutput''')
...
"NameError: name cur is not defined, Did you mean: 'chr'?"
While it is important to focus on how to get data into the ReportGenWorker function, we also need to be able to pull data out, which we cannot continue to do with SQLite. Fortunately, a few more alterations should help us accomplish this:
def ReportGen(Report):
ReportName = Report[3]
PandasDB = Report[7]
currentids=PandasDB['id'].unique().tolist()
ReportResults = ReportGenWorker(currentids, PandasDB)
#export our results to csv
ReportResults.to_csv(f'{csvoutputdir}{ReportName}.csv', sep=',', index=False, quoting=csv.QUOTE_NONNUMERIC, quotechar="'")
print(f'{ReportName} Completed')
def ReportGenWorker(currentids, PandasDB):
...
for currentid in currentids:
...
ReportResults = pd.read_sql_query("SELECT * FROM FinalOutput", MasterDB)
...
return(ReportResults)
Our ReportGenWorker function now converts the contents of the FinalOutput table into a Pandas dataframe, which is returned to the ReportGen function where it is written to a .csv file. With these changes, our script now successfully executes!
Before we celebrate; how do our reports look?
YearlyGen.csv:
'id','MedianLow','MedianHigh','MedianVolumeLow','MedianVolumeHigh','MeanLow','MeanHigh','MeanVolumeLow','MeanVolumeHigh','MinLow','MinHigh','MaxLow','MaxHigh'
2,160.0,164.0,8110064.5,20439520.0,160.69444444444446,164.85555555555555,7918714.452777778,20312851.71111111,145.0,149,186.0,190
6,185119.0,191536.0,348.0,108.0,185699.44568245125,193601.6061452514,351.8050139275766,112.23955431754875,161887.0,172579,215196.0,272433
8,184293.0,188319.0,331.0,94.0,184653.94428969358,188819.45961002784,329.6824512534819,97.42061281337047,172696.0,177184,206864.0,229456
10,184483.0,191122.5,350.0,102.0,186137.86908077996,193321.74581005587,348.71866295264624,108.07799442896936,169877.0,178063,438581.0,337600
12,191940.0,199444.0,338.0,98.0,192673.0974930362,201241.76536312848,340.1058495821727,100.37883008356546,174504.0,183494,297609.0,282826
28,395.5,436.0,49.0,27.0,523.45197740113,2111.3229461756373,55.325905292479106,34.05292479108635,18.0,1,9500.0,132063
...
[3898 rows x 13 columns]
These results look deceptively similar to our original reports, however there is one small devil hiding in the details; if you look closely, you’ll notice that many of our values now carry a trailing .0 and represent floating-point values, rather than being round integers.
Now, why is this important? Any time you make even the most miniscule change to data that may be sent downstream to other functions, you introduce the possibility of cascading failure. Breaking downstream functionality is a cardinal sin that we should all strive to never commit, even in the name of more efficient execution.
While our goal was for our output to be identical, in the context of this exercise, this change in our output data represents an acceptable deviation. If you review the output of the original script, you will notice that some columns are floating-point: for id 28, MedianLow is 395.5. Given the way that Pandas handles datatypes, if one value in a column is of type Float, all values must reflect the same type. Hence, our integers now have a trailing .0. While my downstream functions will gladly accept this slightly different data, your mileage may vary depending on your application; it is always best to assume that bit-by-bit output-parity is a hard requirement.
Continuing: With our latest changes, we have overcome the SQLite limitation involving passing data between processes. Our next problem to solve involves SQLite’s limitations involving parallelization.
We expressed an idea earlier to “split the database into smaller, discrete databases before running calculations”. We also confirmed in our earlier review that the results of our SQLite commands for each unique ID are mutually distinct, and that since we are processing data for thousands of unique IDs, we should be able to split the workload into as many separate processes as there are IDs for a given report.
While there are quite a few different ways to accomplish this, our goal will be for the ReportGen function to sort and split Pandas dataframes into into equally-sized chunks, with the quantity of chunks matching our quantity of processing cores. This will allow our workload to scale to as many or to as few cores as we would like while limiting idle time.
Let’s start by sorting and indexing our dataframe within our ReportGen function:
PandasDB.sort_values(by='id', axis=0, inplace=True)
PandasDB.set_index(keys=['id'], drop=False,inplace=True)
Since we are going to be splitting our dataframe based off of the ‘id’ column, we want to ensure that our table is organized by id before we attempt to break the data apart, as failing to do so may result in missing data. This is because the Pandas split operation that we will be using later-on is very rudimentary in-that it “splits” a dataframe, as it is currently sorted, into chunks. We didn’t need to do this with SQLite because SQLite SELECT queries do not care where values are and will intelligently select all atomic rows that meet a condition regardless of how the rows are organized within a table. Our indexing command likewise will help improve the performance of subsequent commands.
Next, we’ll use this command from the Numpys module after we’ve defined the currentids list:
import numpy as np
...
currentidsarrays = np.array_split(currentids, cpucount)
The np.array_split
command takes a list (currentids) and splits it into a number (cpucount) of approximately-proportional arrays. In our case, we are taking currentids, which contains every atomic value that we are going to iterate against within ReportGenWorker, and divide it by the atomic value of cores we have available to use, which is represented by cpucount. You can think of currentidsarrays as a collection of “cookie cutters” which will help us cut our data into equally-sized chunks in this next block:
split_dataframes = []
for currentidsarray in currentidsarrays:
startingid = int(currentidsarray[0])
endingid = int(currentidsarray[-1])
split_dataframe = PandasDB.loc[(PandasDB.id >= startingid) & (PandasDB.id <= endingid)]
split_dataframes = split_dataframes + [[currentidsarray] + [split_dataframe]]
We start by defining an empty list, “split_dataframes” which we will use to store the results of our for loop. For the sake of continuing our analogy, this is our baking tray.
Within the for loop, we start by capturing the first value and last value of the current array of ids that we are iterating over. These effectively represent the outer edges of each cookie cutter.
We’ll then feed these values into our .loc command, which queries our Pandas dataframe, capturing our target range of values (cutting the cookie), storing the result as a “split_dataframe”, which we then append to the list we defined earlier (placing our cookies on the baking tray), along with its corresponding split array of ids (the cookie cutter). Since you ordinarily wouldn’t bake a cookie cutter, the analogy starts to fall apart here, however we do need this data for an upcoming command.
Next, we’ll spawn a pool of workers which will execute the ReportGenWorker function for all of our split dataframes:
with mp.Pool(cpucount) as p:
dataframe_list = p.map(ReportGenWorker, split_dataframes)
Moving onto the ReportGenWorker function, we’ll make some small changes so that it can correctly process the iterables within our split_dataframes list:
def ReportGenWorker(values):
#define variables
currentidsarray = values[0]
split_dataframe = values[1]
#connect to TempDB and import split_dataframe
TempDB = sqlite3.connect(":memory:")
cur = TempDB.cursor()
cur.executescript('''CREATE TABLE MasterTable('id' INT, 'avgHighPrice' INT,'highPriceVolume' INT,'avgLowPrice' INT,'lowPriceVolume' INT);''')
cur.executescript('''CREATE TEMP TABLE FinalOutput('id' INT, 'MedianLow' INT,'MedianHigh' INT,'MedianVolumeLow' INT,'MedianVolumeHigh' INT,'MeanLow' INT,'MeanHigh' INT,'MeanVolumeLow' INT,'MeanVolumeHigh' INT,'MinLow' INT,'MinHigh' INT,'MaxLow' INT,'MaxHigh' INT);''')
split_dataframe.to_sql('MasterTable', TempDB, if_exists="append", index=False)
cur.execute('''CREATE INDEX indices ON MasterTable(id);''')
for currentid in currentidsarray:
...
Our ReportGenWorker function otherwise hasn’t changed very much; each running process handles a smaller subset of data, rather than all of the data for a single report. We no longer need to consider any concurrency issues with SQLite because each connection lives and dies within distinct, mutually-exclusive processes before returning a split result to our main thread:
...
#insert all of the values for the current id into the FinalOutput table
split_result = pd.read_sql_query("SELECT * FROM FinalOutput", TempDB)
#cleanup
...
return(split_result)
All of our split_results are added to dataframe_list. Once all of our workers are finished, back in our ReportGen function, we recombine and sort the items in this list:
#concatenate dataframe_list so that we can combine the work performed by the worker pool
dataframe_concat = pd.concat(dataframe_list)
#fix the index in-case things came back in the wrong order
dataframe_concat.sort_values('id')
… And finally, we export to .csv
#export to CSV
dataframe_concat.to_csv(f'{csvoutputdir}{ReportName}.csv', sep=',', index=False, quoting=csv.QUOTE_NONNUMERIC, quotechar="'")
print(f'{ReportName} Completed')
Our complete script looks like this:
#import necessary modules
import sqlite3
import csv
import glob
from glob import glob
import pandas as pd
import time
import os
import pickle
import multiprocessing as mp
import numpy as np
def ReportGen(Report):
#extract items needed to generate a report
ReportName = Report[3]
PandasDB = Report[7]
#sort and index pandas dataframe
PandasDB.sort_values(by='id', axis=0, inplace=True)
PandasDB.set_index(keys=['id'], drop=False,inplace=True)
#get unique id list
currentids=PandasDB['id'].unique().tolist()
#split list of ids into as many arrays as we have CPUs defined and add them to a list "currentidsarrays"
currentidsarrays = np.array_split(currentids, cpucount)
#likewise, split our pandas dataframe into as many dataframes as we have arrays of ids and add them to a list "split_dataframes"
split_dataframes = []
for currentidsarray in currentidsarrays:
startingid = int(currentidsarray[0])
endingid = int(currentidsarray[-1])
split_dataframe = PandasDB.loc[(PandasDB.id >= startingid) & (PandasDB.id <= endingid)]
split_dataframes = split_dataframes + [[currentidsarray] + [split_dataframe]]
#spawn a worker pool, providing each worker with a corresponding split dataframe, with results being placed in a list "dataframe_list"
with mp.Pool(cpucount) as p:
dataframe_list = p.map(ReportGenWorker, split_dataframes)
#concatenate dataframe_list so that we can combine the work performed by the worker pool
dataframe_concat = pd.concat(dataframe_list)
#fix the index in-case things came back in the wrong order
dataframe_concat.sort_values('id')
#export to CSV
dataframe_concat.to_csv(f'{csvoutputdir}{ReportName}.csv', sep=',', index=False, quoting=csv.QUOTE_NONNUMERIC, quotechar="'")
print(f'{ReportName} Completed')
def ReportGenWorker(values):
#define variables
currentidsarray = values[0]
split_dataframe = values[1]
#connect to TempDB and import split_dataframe
TempDB = sqlite3.connect(":memory:")
cur = TempDB.cursor()
cur.executescript('''CREATE TABLE MasterTable('id' INT, 'avgHighPrice' INT,'highPriceVolume' INT,'avgLowPrice' INT,'lowPriceVolume' INT);''')
cur.executescript('''CREATE TEMP TABLE FinalOutput('id' INT, 'MedianLow' INT,'MedianHigh' INT,'MedianVolumeLow' INT,'MedianVolumeHigh' INT,'MeanLow' INT,'MeanHigh' INT,'MeanVolumeLow' INT,'MeanVolumeHigh' INT,'MinLow' INT,'MinHigh' INT,'MaxLow' INT,'MaxHigh' INT);''')
split_dataframe.to_sql('MasterTable', TempDB, if_exists="append", index=False)
cur.execute('''CREATE INDEX indices ON MasterTable(id);''')
for currentid in currentidsarray:
cur.execute(f'''CREATE TEMP TABLE currentid AS SELECT * FROM MasterTable WHERE id = {currentid}''')
#create separate tables for each value where null values have either been eliminated for price, or set to 0 for volume, so that we will get accurate median/mean/min/max figures later-on.
cur.executescript('''CREATE TEMP TABLE AvgLowPriceFixNull AS SELECT avgLowPrice FROM currentid WHERE avgLowPrice != ''; CREATE TEMP TABLE AvgHighPriceFixNull AS SELECT avgHighPrice FROM currentid WHERE avgHighPrice != ''; CREATE TEMP TABLE LowPriceVolumeFixNull AS SELECT lowPriceVolume FROM currentid; UPDATE LowPriceVolumeFixNull SET lowPriceVolume = 0 WHERE lowPriceVolume = ''; CREATE TEMP TABLE HighPriceVolumeFixNull AS SELECT highPriceVolume FROM currentid; UPDATE HighPriceVolumeFixNull SET highPriceVolume = 0 WHERE highPriceVolume = '';''')
#median calculations (this is very ugly because sqlite does not have a native median function)
cur.executescript('''CREATE TEMP TABLE MedianLowTable AS SELECT AVG(avgLowPrice) AS MedianLow FROM (SELECT avgLowPrice FROM AvgLowPriceFixNull ORDER BY avgLowPrice LIMIT 2 - (SELECT COUNT(*) FROM AvgLowPriceFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM AvgLowPriceFixNull)); CREATE TEMP TABLE MedianHighTable AS SELECT AVG(avgHighPrice) AS MedianHigh FROM (SELECT avgHighPrice FROM AvgHighPriceFixNull ORDER BY avgHighPrice LIMIT 2 - (SELECT COUNT(*) FROM AvgHighPriceFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM AvgHighPriceFixNull)); CREATE TEMP TABLE MedianLowVolumeTable AS SELECT AVG(lowPriceVolume) AS MedianVolumeLow FROM (SELECT lowPriceVolume FROM LowPriceVolumeFixNull ORDER BY lowPriceVolume LIMIT 2 - (SELECT COUNT(*) FROM LowPriceVolumeFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM LowPriceVolumeFixNull)); CREATE TEMP TABLE MedianHighVolumeTable AS SELECT AVG(highPriceVolume) AS MedianVolumeHigh FROM (SELECT highPriceVolume FROM HighPriceVolumeFixNull ORDER BY highPriceVolume LIMIT 2 - (SELECT COUNT(*) FROM HighPriceVolumeFixNull) % 2 OFFSET (SELECT (COUNT(*) - 1) / 2 FROM HighPriceVolumeFixNull));''')
#mean calculations
cur.executescript('''CREATE TEMP TABLE MeanLowTable AS SELECT AVG(avgLowPrice) AS MeanLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MeanHighTable AS SELECT AVG(avgHighPrice) AS MeanHigh FROM AvgHighPriceFixNull; CREATE TEMP TABLE MeanLowVolumeTable AS SELECT AVG(lowPriceVolume) AS MeanVolumeLow FROM LowPriceVolumeFixNull; CREATE TEMP TABLE MeanHighVolumeTable AS SELECT AVG(highPriceVolume) AS MeanVolumeHigh FROM HighPriceVolumeFixNull;''')
#minmax calculations
cur.executescript('''CREATE TEMP TABLE MinLowPriceTable AS SELECT min(avgLowPrice) AS MinLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MinHighPriceTable AS SELECT min(avgHighPrice) AS MinHigh FROM AvgHighPriceFixNull; CREATE TEMP TABLE MaxLowPriceTable AS SELECT max(avgLowPrice) AS MaxLow FROM AvgLowPriceFixNull; CREATE TEMP TABLE MaxHighPriceTable AS SELECT max(avgHighPrice) AS MaxHigh FROM AvgHighPriceFixNull;''')
#insert all of the values for the current id into the FinalOutput table
cur.execute('''INSERT INTO FinalOutput SELECT DISTINCT currentid.id AS id, MedianLow, MedianHigh, MedianVolumeLow, MedianVolumeHigh, MeanLow, MeanHigh, MeanVolumeLow, MeanVolumeHigh, MinLow, MinHigh, MaxLow, MaxHigh FROM currentid CROSS JOIN MedianLowTable CROSS JOIN MedianHighTable CROSS JOIN MedianLowVolumeTable CROSS JOIN MedianHighVolumeTable CROSS JOIN MeanLowTable CROSS JOIN MeanHighTable CROSS JOIN MeanLowVolumeTable CROSS JOIN MeanHighVolumeTable CROSS JOIN MinLowPriceTable CROSS JOIN MinHighPriceTable CROSS JOIN MaxLowPriceTable CROSS JOIN MaxHighPriceTable;''')
#cleanup - purge temp tables before proceeding to next id
cur.executescript('''DROP TABLE IF EXISTS currentid; DROP TABLE IF EXISTS AvgLowPriceFixNull; DROP TABLE IF EXISTS AvgHighPriceFixNull; DROP TABLE IF EXISTS LowPriceVolumeFixNull; DROP TABLE IF EXISTS HighPriceVolumeFixNull; DROP TABLE IF EXISTS MedianLowTable; DROP TABLE IF EXISTS MedianHighTable; DROP TABLE IF EXISTS MedianLowVolumeTable; DROP TABLE IF EXISTS MedianHighVolumeTable; DROP TABLE IF EXISTS MeanLowTable; DROP TABLE IF EXISTS MeanHighTable; DROP TABLE IF EXISTS MeanLowVolumeTable; DROP TABLE IF EXISTS MeanHighVolumeTable; DROP TABLE IF EXISTS MinLowPriceTable; DROP TABLE IF EXISTS MinHighPriceTable; DROP TABLE IF EXISTS MaxLowPriceTable; DROP TABLE IF EXISTS MaxHighPriceTable;''')
#insert all of the values for the current id into the FinalOutput table
split_result = pd.read_sql_query("SELECT * FROM FinalOutput", TempDB)
#cleanup
cur.executescript('''DROP TABLE IF EXISTS currentid; DROP TABLE IF EXISTS AvgLowPriceFixNull; DROP TABLE IF EXISTS AvgHighPriceFixNull; DROP TABLE IF EXISTS LowPriceVolumeFixNull; DROP TABLE IF EXISTS HighPriceVolumeFixNull; DROP TABLE IF EXISTS MedianLowTable; DROP TABLE IF EXISTS MedianHighTable; DROP TABLE IF EXISTS MedianLowVolumeTable; DROP TABLE IF EXISTS MedianHighVolumeTable; DROP TABLE IF EXISTS MeanLowTable; DROP TABLE IF EXISTS MeanHighTable; DROP TABLE IF EXISTS MeanLowVolumeTable; DROP TABLE IF EXISTS MeanHighVolumeTable; DROP TABLE IF EXISTS MinLowPriceTable; DROP TABLE IF EXISTS MinHighPriceTable; DROP TABLE IF EXISTS MaxLowPriceTable; DROP TABLE IF EXISTS MaxHighPriceTable;''')
return(split_result)
### runtime ###
if __name__ == '__main__':
#define starting variables
csvoutputdir = 'csvoutput/'
cpucount = 3
# open pickled data from filesystem that we want to create reports with
with open('ReportGenList.pkl', 'rb') as file:
ReportList = pickle.load(file)
print(ReportList)
#start execution timer
starttime = time.time()
#process data
for Report in ReportList:
ReportGen(Report)
print('script complete')
print(time.time() - starttime)
… And this script executes successfully! Let’s validate if it achieves each of our goals.
As far as input/output consistency is concerned, our refactored code succeeds; neither our input nor output data has changed, besides the aforementioned Float type conversions which we’ve determined are an acceptable deviation.
Next, let’s take a look at performance.
cpucount = 1
...
46.954...
cpucount = 2
...
24.468...
cpucount = 3
...
17.398...
Single-core performance is only marginally slower than our original, serially-executed script and well within our expectation of acceptable performance. Multi-core performance on the other hand far-exceeds the performance of our first parallelization attempt made prior to any refactoring; 2-core performance has improved by 10-seconds, with 3-core performance improving by 7 seconds.
Our script furthermore can effectively use more than 3 cores:
cpucount = 4
...
13.367...
cpucount = 14
...
6.811
cpucount = 15
...
6.562...
cpucount = 16
...
6.274...
Even as we approach the 16-core limit of our hardware, performance continues to improve, albeit marginally.
With this result, we have achieved all of our goals, and our refactoring has been a success!
Closing Notes
We started our exercise with a script that could only scale, inefficiently, in-parallel against the quantity of reports being generated. By identifying and breaking-down an oversized function into smaller, discrete units, we improved our design, ending-up with a script which can scale parallel operations in a load-balanced manner against thousands of cores, irrespective of report count. With our example data paired with our available hardware, we experienced a 4x improvement in executon speed.
While code refactoring may be difficult, it is often necessary in-order to apply effective parallelization techniques and attain faster code execution.
Did you find this article helpful? Are you interested in reading more? Feel free to reach me via the Contact page. Thank you for reading!