In our last article, we validated that the MMORPG EvE Online fulfills all of our prerequisites for data analytics and modeling.
In this article, we will:
- Review the Eve Swagger Interface or ESI, which will serve as our primary source of market-related data for EvE Online.
- Review and contextualize ESI endpoints and static data sources
- Set goals and work through an iterative process of developing a Python script to collect and save ESI data to a local SQLite database, including the application of adequate exception handling, data congruence verification, scheduling, flow control, and performance optimizations.
ESI Overview
At first glance, ESI can be a bit daunting; at the time of this writing ESI provides 206 unique endpoints, which is a far cry from the small handful of endpoints we have used previously from the OSRS Wiki. This scope and variety provides developers with the power to make extremely useful tools for EvE Online players, with authenticated endpoints leveraging OAuth2 allowing integration with players’ in-game assets. I’ve personally used authenticated endpoints to conduct automated inventory management for my in-game trading characters in the past, and I encourage anybody interested in EvE Online either as a player, observer or developer to review and capitalize on the wealth of data and features offered by ESI.
For the purpose of data analytics concerning the in-game economy, everything we need from ESI can be obtained from the unauthenticated endpoints present under the “Market” tab:

Endpoints containing a padlock require authentication, while the rest are available to the public. The web interface provides a neat interface for testing each endpoint, so let’s take a look at each of the unauthenticated endpoints and see what they can do for us.
/markets/{region_id}/history/
This endpoint requires a “region_id” and “type_id”. For some context on these values:
- Markets in EvE Online are provided by stations that exist in solar systems. Groups of solar systems are organized into regions, and in the context of trade, markets are within the exclusive domain of their respective region and cannot be accessed from any other region. As a result of these game mechanics, regions present the largest discrete organizational unit of market data. Likewise, most ESI market endpoints require that a “region_id” be provided, which is simply a unique numerical identifier for a region. For instance, the region containing Jita is “The Forge” with a region_id of 10000002.
- “type_id” is a unique numerical identifier for an in-game object, including any of the ~17,000 unique items which can be traded on markets.
Once a region_id and type_id have been specified, the endpoint returns some data:
- “date”: reporting date for the current set of statistics
- “average”, “highest”, and “lowest”: Provide a range of executed prices
- “order_count”: Quantity of open buy/sell orders
- “volume”: Unit quantity bought/sold
However, this endpoint comes with a few quirks:
- There is no indication of how “average” is calculated (median or mean?)
- A single API call appears to return the entire stored history of data, which spans over 400 days and there is no option to narrow this scope.
- Given the type_id requirement, we would need to call the endpoint as many as 17,000 times in-order to get the full scope of market history just for a single region.
- This endpoint is only updated once per day, so there is low recency
- While most ESI endpoints feature no rate limit, this endpoint is currently rate-limited to 300 queries per minute.
Despite these quirks, this endpoint provides extremely useful historical market data. In the context of our work involving OSRS, this is functionally identical to the /24h endpoint provided by the OSRS Wiki API. Let’s move-on to the next one:
/markets/{region_id}/orders/
This endpoint only requires a region_id. Calling this returns the following data:
- “order_id”: The individual Buy or Sell order being reported against
- “is_buy_order”: True for Buy orders, false for Sell orders
- “duration”: How long (in days) the order is valid for.
- “issued”: Date and time the order was originated, or relisted
- “location_id”: The station where the order was originated.
- “system_id”: The solar system where the order was originated
- “min_volume”: Minimum unit quantity required to either partially or completely clear the order
- “price”: Per-unit price set by the order
- “range”: Distance a counter-party must be within to execute a trade against the order
- “type_id”: Type of commodity the order is for
- “volume_remain”: Quantity of remaining units
- “volume_total”: Starting quantity of units, notwithstanding any units which have already been bought or sold.
This endpoint is where we really start to see the difference in data analytics potential between OSRS and EvE Online; while OSRS provides zero transparency of public open market orders, EvE Online provides absolute transparency. With this endpoint, it is possible to scrape every active public market order and obtain a nearly-complete picture of live behavior within the game.
Like the history endpoint, there are some quirks, limitations, and features:
- The API will return a maximum of 1000 individual orders per query; complete data sets exceeding this amount can be obtained by iterating over “pages” of data. For some context, the order book for the region The Forge at the time of this writing includes 367 pages, or 366,000+ individual orders.
- This endpoint is cached for 300 seconds (5 minutes) before it is refreshed with new in-game data.
- Historical data is not available; only the currently-cached order book can be obtained.
- Orders on player-owned markets which have been made private cannot be obtained using this query
Despite the quirks and limitations, this endpoint provides extremely granular market data with decent recency and will prove invaluable for our reporting purposes. Let’s move-on to our next endpoint:
/markets/{region_id}/types/
This endpoint simply returns a list of type_ids associated with the indicated region. While this might be useful if we just need a list of IDs to feed into calls for the /history endpoint, it is otherwise limited in its usefulness; this information is already implicitly provided by the /orders endpoint which also provides greater recency. (5 minutes versus 10 minutes for /types)
/markets/groups/ & /markets/groups/{market_group_id}/
Like /types, the /groups endpoint is very simple and just returns a list of group_ids. A “group” is simply a way that the game categorizes type_ids, usually by in-game function or purpose. While groups are invaluable when making in-game purchases as a consumer, they are arbitrary for trade strategists who do not care about the function or purpose of goods. As a result, for the purposes of our reporting goals this endpoint provides no value. Likewise, the /{market_group_id} endpoint simply returns human-readable information about a specified group, which we also do not need.
Onto the final unauthenticated market endpoint:
/markets/prices/
This endpoint does not require any data to call and returns the following information:
- “type_id”: The type of commodity being reported against
- “average_price”: Average trade price per-unit
- “adjusted_price”: Average trade price per-unit, adjusted/sanitized by the game publishers.
In-game, these prices serve some useful functions including, but not limited to:
- Calculating players’ approximate in-game net worth; since players can have assets scattered throughout many regions, it helps to have region-agnostic price figures for the game to reference when making this calculation.
- Sanity-checking market prices; if a player attempts to place an order far lower or greater than these prices, a warning might appear informing the player before the order can be executed, serving as a means of protecting players from flagrant price gouging or low-ball market orders.
That said, this endpoint and its associated price values come with some severe limitations:
- The prices are ambiguously-derived; it is unclear how the game developers calculate inter-regional average pricing, let alone the “adjusted price” value.
- Recency is poor; this endpoint is updated only once per day.
As a result, this data is just about as useful as the “guide price” for items in OSRS; while they might provide users with ballpark pricing figures, for the purposes of data analytics they are too ambiguous and lack the necessary relevance, granularity, specificity and recency needed to provide any useful insight.
Bonus: Fuzzworks Static Data Dumps
While it looks like we will primarily be using the /orders and /history endpoints from ESI, we still need some additional data. None of these endpoints provide important in-game attributes like English item names or the per-unit cubic volume of goods; despite seldomly changing, static data is needed to make useful reports since it enables readability and provides important contextual information. The equivalent endpoint for OSRS is /mapping, however there is no equivalent endpoint for EvE Online.
Instead, CCP provides a Static File Export (SDE) containing this information in a ~100MB .zip file which they routinely update. Since this source data is clunky, many members in the EvE Online community provide their own SDEs derived from this source. FuzzySteve provides a versatile library which we will incorporate into our data collection and reporting strategy.
Project Scope
Now that we have a good understanding of what ESI can provide and the endpoints that we want to use, we can get started outlining our data collection goals.
Setting Collection Goals
Setting a scope and defining specific goals is often the most difficult part of any project; We want all of the data, however this often comes at-odds with the limitations of the API and our own storage space.
So, let’s start by taking our endpoints to their logical extremes:
- /orders provides the most raw data with the greatest degree of granularity. Since we cannot directly collect data from the past, we must routinely collect and save this data in-order to build a continuous stream of order history. As our fetching duration increases, the wider our data set will become, and as our fetching interval decreases, the more granular our data set will become, with the best possible granularity of 5 minutes.
- /history on the other hand provides less granularity and less data, but makes-up for this by providing over a year’s worth of history for a given type_id and region. As a result, we should fetch this data a maximum of once per day; unlike /orders, there is no risk of losing access to historical data if this interval is missed.
So, how much space will /orders consume over-time? A single call against The Forge returns JSON that is 306KB in-size. With 367 pages, the entire live order book for The Forge is 112MB… And that is just one region. If we want to collect the same data for all other major trading hubs, we are looking at approximately 250MB. Collecting data at every 5-minute interval would result in the generation of 3GB of data per hour, or 72GB per day.
While these rough theoretical figures over-estimate the actual storage requirements, and while many different strategies could be employed to mitigate this growth, it can help at this point to stop and consider what we actually need. In some business-related contexts where data is far more valuable than the storage medium required to contain it, the appropriate answer will be to grab and keep everything. In my context involving a 21 year-old MMORPG, the calculus is not as favorable; my homelab machine contains a 256GB SSD which is already responsible for other workloads, including my OSRS reporting which at the time of this writing is hardly consuming 5GB of space… and suffice to say, if I can generate useful reports with *only* 5GB of data, I should be able to make do with not having everything for EvE.
So, what if we lose some granularity and collect data every hour instead? That brings us to 6GB per day, which in my context is far more reasonable; I should be able to keep about one to two weeks’ worth of data to start, which should be adequate for a handful of reporting techniques. We’ll build our data collection scripts in a manner where granularity and retention can be improved in the future in-case we ever find some missing digits in our hardware budget.
Now, what about /history? A test call returns JSON that is 56KB in-size. If we extrapolate against all ~17,000 unique items in the game, this comes-out to a little over 10GB. More realistically, this will be much smaller; a lot of that space is taken-up by response headers, and most unique items feature low trade volume with correspondingly less data to record. This also isn’t likely to increase in size much since that 56KB already includes over a year of market history for the given region and type_id. Unlike the /orders endpoint, we don’t have to worry much about losing access to historical data, so we’ll omit regions other than The Forge for now and determine in the future if they need to be included; we always have the option of spot-checking this endpoint for other regions if we only need to fetch information for a handful of type_ids in our reports.
So our goal will be to more-or-less:
- Maintain at least 7 days worth of order data for all major trading regions with 1 hour of granularity
- Maintain a local copy of market history for The Forge, refreshed daily.
- Allow increased retention and granularity in the future if desired.
Excellent; now that we roughly understand our parameters, let’s see how ESI can meet them.
ESI Documentation: Limitations and Best Practices
Before we start hammering the API and get our IP banned, it is important to read the manual. ESI provides comprehensive documentation here outlining a lot of useful information including an introduction, guidelines, frequently asked questions, and more.
For our purposes, here are the key things we need to take into account for our project:
- All endpoints feature an error limit. At the moment, this is set at 100 errors per minute. If this rate is exceeded, we will be blocked from making any subsequent queries for a minute, with repeat offenders facing the possibility of an IP ban.
- While most endpoints (including /orders) do not feature any rate limit, some do. For our purposes, the market history endpoint currently has a rate limit of 300 calls per minute which CCP implemented in-order to curtail abuse following an extended outage. Repeatedly exceeding this rate or otherwise abusing this endpoint could result in an IP ban.
- We should include a descriptive user-agent in our request headers with contact information.
- We should avoid attempting to capture data before/during the cache refresh interval.
- Long, sustained and slow request intervals are preferred over short, sudden and heavy bursts.
At a glance, ESI is extremely permissive for a public API; as we recall, the OSRS Wiki API enforced a rate limit of 1 request per second. Unlike OSRS however, there is a lot more data that we need to capture for EvE, requiring substantially more API calls.
With our goals and limitations in back of our mind, let’s get started with some scripting.
Data Collection
Let’s create a new python script and set some starting parameters:
import sqlite3
## define headers
headers = {
'User-Agent': 'Enter a descriptive user-agent!',
}
## specify regions we wish to capture data for and the associated region_id
regionlist = [['TheForge', 10000002], ['Domain', 10000043], ['Essence', 10000064], ['Metropolis', 10000042], ['SinqLaison', 10000032], ['Heimatar', 10000030], ['TashMurkon', 10000020]]
## connect to sqlite DB and initialize cursor
marketdatadb = sqlite3.connect("marketdata.sqlite")
cur = marketdatadb.cursor()
## generate normalized time value for timestamping the data we collect (rounded-down to nearest 5-minute interval)
unixtime = int(round(time.time() / 300) - 0.5) * 300
We’ll start by defining our user-agent, which we’ll use for any calls made against ESI per their documentation.
Next, we’ll define a list of regions we wish to report against. Starting with The Forge, this list includes most of the major trading regions in the game and their associated region_id. We can add more later if we want but for now this should be a good start.
Then, we’ll connect to a local SQLite database and define a cursor so that we can get started storing the data we collect.
Finally, we’ll generate a time value ‘unixtime’ which we will use to timestamp any data we collect during this script execution interval.
Now we’re ready to get started tackling the /orders endpoint:
Calling /orders
Since the data provided by /orders is delineated into pages, we need to develop a way to call every page and concatenate the data before writing it to our database. Here is my early attempt, with comments outlined in bold:
# Import some more modules
import requests
import pandas as pd
import time
for region in regionlist:
urlprefix = 'https://esi.evetech.net/latest/markets/' + str(region[1]) + '/orders/?datasource=tranquility&order_type=sell&page='
currentpage = 1
dataframes = []
url = urlprefix + str(currentpage)
r = requests.get(url, headers=headers)
dataframes.append(pd.DataFrame(r.json()))
# We will iterate over each page until we hit a 404, indicating the end of the page series.
while not r.status_code == 404:
currentpage = currentpage + 1
url = urlprefix + str(currentpage)
r = requests.get(url, headers=headers)
if r.status_code != 404:
try:
# until we reach a 404, we will append any data we receive to our dataframes list
currentpagedata = pd.DataFrame(r.json())
dataframes.append(currentpagedata)
except:
# if an exception occurs we can print the result for review and move-on
print('eyy we got a problem, look at this json')
print(r.json())
# Concatenate all of our data into a single pandas dataframe, append our normalized timestamp, write to database, and create an index if there isn't one already
regiondata = pd.concat(dataframes, axis=0)
regiondata['timestamp'] = unixtime
regiondata.to_sql(region[0], marketdatadb, if_exists='append', index=False)
cur.executescript(f'''CREATE INDEX IF NOT EXISTS {region[0]}Index ON {region[0]}(timestamp, order_id)''')
While this script does successfully write market order data to our database, there are quite a few issues in the context of our goals and limitations:
- During testing, this script takes over 10 minutes to execute which far exceeds the 5-minute cache interval for the endpoint.
- Using a 404 to delineate the end of a page series works as-intended, however we should avoid generating errors whenever possible since there is an error limit.
- There is also no rate limiting when an error occurs, which is problematic since exceeding the limit of 100 errors per minute could get our IP banned from ESI.
- Exception handling is too rudimentary and needs improvement; if there is a temporary network issue or other exception-throwing event, we need to attempt to retry fetching data.
- There are no defined functions and everything is just piled into the runtime; in-order to effectively scale our code to encompass other endpoints and to fix the aforementioned issues, it will help if we break our script up into discrete and portable functions.
As a result, this script is unacceptable in its current form and must be improved before we put it to continuous use. Let’s work through possible solutions for each of these problems:
Starting with the cache interval; if a single data collection routine spans multiple API refresh intervals, at best we will experience incongruent data with missing and/or duplicate fields, and at worst our script will crash or malfunction. This is something we never needed to worry about with the OSRS Wiki API since a single call always returned a discrete set of data. On the other hand, for EvE Online we need to make 300+ separate calls in-order to obtain one set of order data, and for that data to remain discrete and distinct from past and future data, our calls all need to be made against an unchanged cache.
So, how do we turn a 10+ minute routine into a sub-5 minute routine?
Our script currently calls the /orders endpoint serially, with a new request being initiated only after the previous request has completed. This typically wouldn’t present an issue, however my homelab is in California and ESI is hosted at an AWS facility in Ireland, resulting in a lot of network latency with long delays between individual API calls. Fortunately we can overcome the bottleneck of latency by simply sending more requests at a time, which we can achieve by implementing some simple parallelization techniques which I’ve covered in an earlier set of articles.
Since network latency is an example of an I/O bottleneck, we aren’t concerned with throwing more computational power at the problem; we just need more threads to throw concurrent requests. However, in-order to send concurrent requests, we need a discrete and divisible workload, which our cute 404 loop termination logic does not provide; so far we only know how many pages there are to call after the final page has already been gathered.
Fortunately, upon closer inspection of the response header for /orders, the page count is provided through the value ‘X-pages’. With our initial call, we can get the page count and obtain what we need to permit parallelization for subsequent calls. The EvE Online developer blog published an article covering pagination for ESI calls a few years ago where they leverage a module called grequests to attain concurrency. For our purposes, we can just use parallel threads to the same effect.
The response header also provides an ‘expires’ value indicating the next time the cache will refresh, which we can use to help ensure data congruence by confirming that the data we collect was all gathered within the same refresh interval.
Next, since errors cause exceptions, our error limiting and exception handling concerns go hand-in-hand; we need to write our script in a manner where failed calls don’t just log the error, but permit retries which will prevent gaps from occurring in our data sets whenever a momentary drop in our connection causes a call to fail. We also need to ensure that when exceptions do occur, there is an appropriate delay incorporated in-order to ensure that the error limit can never be reached.
So, with a lot of things to think about and change, let’s scrap our first script and write something better!
Refactoring Attempt #1:
First, let’s take a look at our new runtime:
...
## runtime
if __name__ == '__main__':
for region in regionlist:
regionstatus = 'incomplete'
while regionstatus == 'incomplete':
regionstatus = FetchOrderData(region)
This is now much simpler and easier to read than our original runtime; it is clear that our script simply iterates over each region, executing the FetchOrderData function until we get a response other than ‘incomplete’. Great! Now let’s take a look at how the FetchOrderData function addresses our page counting and data congruence concerns, with detailed comments outlined in bold:
...
from multiprocessing.pool import ThreadPool
import sys
def FetchOrderData(region):
# start a timer for this region for logging purposes
RegionStartTime = time.time()
# setup our URL prefix based off of the current region being reported against
urlprefix = 'https://esi.evetech.net/latest/markets/' + str(region[1]) + '/orders/?datasource=tranquility&order_type=all&page='
# fetch the first page for our region, including response data followed by 'x-count 'and 'expires' header data
url = urlprefix + str(1)
firstpage = OrderDataCall(url)
# generate a list of URLs based off of 'x-count'
urllist = []
for pagenumber in range(2, (firstpage[1] + 1)):
urllist.append(urlprefix + str(pagenumber))
# call every page of data in a threadpool
with ThreadPool(5) as p:
pagelist = p.map(OrderDataCall, urllist)
# check each page of data to ensure matching 'expires' header value
for page in pagelist:
if page[2] != firstpage[2]:
# if the 'expires' header values do not match for each page, our data set is incongruous so we should discard this data and start over.
print(f'retrying {region[0]} due to incongruous data')
return('incomplete')
# if the 'expires' header values all match, we can proceed with conversion and database operations
pagedatalist = [page[0] for page in pagelist]
pagedatalist.append(firstpage[0])
regiondata = pd.concat(pagedatalist, axis=0)
regiondata['timestamp'] = unixtime
regiondata.to_sql(region[0], marketdatadb, if_exists='append', index=False)
# purge any data more than 10 days old from the database for the reported region
cur.executescript(f'''DELETE FROM {region[0]} WHERE timestamp < {unixtime - 864000}''')
# build index if it doesn't exist
cur.executescript(f'''CREATE INDEX IF NOT EXISTS {region[0]}Index ON {region[0]}(timestamp, order_id)''')
# return 'complete' string; region data collection is complete
print(f'done with region {region} in {time.time() - RegionStartTime} seconds')
return('complete')
...
And finally, our OrderDataCall function, which addresses our exception handling and error rate limiting concerns, while also illustrating how the response header values are obtained and passed down to FetchOrderData. We’ve also added a simple time-out to prevent the script from getting locked-up if a requested page is no longer available:
def OrderDataCall(url):
status = 'not done'
while status == 'not done':
calltimer = int(time.time())
try:
r = requests.get(url, headers=headers)
if r.status_code == 200:
# if a call succeeds, we will gather and return the requested data as a list
pagedata = pd.DataFrame(r.json())
pagecount = int(r.headers['X-pages'])
pageexpires = str(r.headers['expires'])
page = [pagedata, pagecount, pageexpires]
status = 'done'
# if any status code other than a 200 is received, or if any exception occurs due to a temporary network issue, the function will sleep. A sleep interval of 4 seconds per error for 5 threads results in a maximum error rate of 75 per minute which is well within the 100 per minute threshold.
else:
time.sleep(4)
except:
time.sleep(4)
# timeout after 5 minutes
if int(time.time()) - 300 > calltimer:
print(f'failed to fetch {url} in 5 minutes; killing and restarting procedure for current region')
return([0, 0, 0])
return(page)
When executed, we get the following output:
...
done with region TheForge in 72.95544290542603 seconds
done with region Domain in 35.52826428413391 seconds
done with region Essence in 12.998199224472046 seconds
done with region Metropolis in 23.157774925231934 seconds
retrying SinqLaison due to incongruous data
done with region SinqLaison in 24.105119943618774 seconds
done with region Heimatar in 17.379561185836792 seconds
done with region TashMurkon in 10.661999464035034 seconds
… And 85MB of congruent data from the /order endpoint has been successfully written to our database!

While our main concerns have been addressed, there is another small improvement we should make; while we have successfully mitigated the risk of introducing bad data into our database, this method of discarding bad data will generate unnecessary calls and under the right conditions could cause the script to perpetually fail. We can solve this by incorporating some rudimentary logic which will force the FetchOrderData function to wait until the endpoint has refreshed if there is insufficient time remaining in the current interval.
We’ll start by modifying our OrderDataCall function in-order to extract the ‘date’ value from the response header, which provides the current date according to ESI.
def OrderDataCall(url):
...
pageexpires = str(r.headers['expires'])
pagedate = str(r.headers['date'])
page = [pagedata, pagecount, pageexpires, pagedate]
...
This value will then be used within FetchOrderData to calculate the time remaining within the /orders endpoint cache interval by subtracting it by ‘expires’. If sufficient time is available, the function will proceed as usual and fetch the remaining pages of data using the thread pool. If there is insufficient time, the function will run the endpoint cache interval timer out and restart once the endpoint cache has been refreshed:
import datetime
...
def FetchOrderData(region):
...
firstpage = OrderDataCall(url)
pageexpires = datetime.datetime.timestamp(datetime.datetime.strptime((firstpage[2].split(', ')[1].split(' GMT')[0]), '%d %b %Y %H:%M:%S'))
pagedate = datetime.datetime.timestamp(datetime.datetime.strptime((firstpage[3].split(', ')[1].split(' GMT')[0]), '%d %b %Y %H:%M:%S'))
SecondsUntilExpiry = pageexpires - pagedate
if SecondsUntilExpiry < (0.5 * firstpage[1]):
print(f'waiting for cache to refresh in {SecondsUntilExpiry} seconds before retrying {region[0]}')
time.sleep(SecondsUntilExpiry + 1)
return('incomplete')
...
Since the ‘expires’ and ‘date’ response header values do not adhere to ISO 8601 date formatting, we cannot perform arithmetic without first performing some ugly string conversions followed by ugly datetime conversions until we get a pair of unix timestamps, with the difference revealing our desired SecondsUntilExpiry. If there are more than two times as many pages to fetch as there are seconds remaining for the current cache, the sleep command will run the timer out and return ‘incomplete’ so that we can attempt the process again on a fresh endpoint cache interval.
For The Forge, this sets our threshold at about 180 seconds. Since it takes about 70-90 seconds for this data to fetch given our current environment and settings, this seems like a measured enough approach for our needs, striking a balance between giving the function enough time to succeed while also not needlessly waiting longer than we need to.
While there are certainly more improvements we could make, at this point and for our purposes we now have a script which:
- Is capable of fetching region order data within a 5-minute cache interval
- Does not generate errors under normal use
- Is incapable of exceeding the ESI error limit
- Possesses adequate exception handling, permitting retries when necessary
- Checks data congruence before writing to the database
- Respects the API by not severely hammering it or making excessively wasteful calls if the current cache refresh interval is about to expire
- Is now organized into discrete functions with a simple runtime
As a result, this script fulfills all of our goals and respects ESI limitations for the /orders endpoint! Let’s move-on to the /history endpoint next:
Calling /history
In-order to start fetching historical market data from the /history endpoint, we need a list of type_ids to use as a reference. While we could use the /types endpoint to obtain this, doing so would require about 17 calls. Rather than build functions for that purpose, we can just query our database information gathered from /orders:
## runtime
if __name__ == '__main__':
...
pd_typeid = pd.read_sql('SELECT DISTINCT type_id FROM TheForge ORDER BY type_id ASC;', marketdatadb)
id_list = pd_typeid['type_id'].tolist()
...
As long as our database is populated with data from /orders before we attempt to fetch data from /history, we can use this approach instead of querying /types; the resulting id_list contains 16,732 unique type_ids which we can now use to start querying /history.
The most important thing to remember about /history is the rate limit of 300 queries every 60 seconds. Given the quantity of unique type_ids we have to call against, the absolute minimum quantity of time needed to fetch this entire set of data is approximately 56 minutes. We will hardly scratch the limit on a single thread, so we should implement a strategy which combines multi-threading with rate-limiting in-order to get as close to 1-hour completion as possible.
After doing some testing, it is also important to note that more than a handful of our queries against /history will either return no data or outright fail on us. While we do want to capture as much data as possible, some type_ids simply will not provide a valid response or data. This most commonly occurs if an item can be listed for trade but has experienced zero volume… in which case it probably won’t be relevant for data analytics purposes anyway. As a result, our exception handling should attempt to retry fetching data, but only one or two times; we are already subject to a rate limit and we definitely don’t want to find ourselves error-limited due to repeated calls for data that doesn’t exist.
Let’s start by adding some lines to our runtime, highlighted in bold:
if __name__ == '__main__':
...
pdidlist = pd.read_sql('SELECT DISTINCT type_id FROM TheForge ORDER BY type_id ASC;', marketdatadb)
idlist = pdidlist['type_id'].tolist()
with ThreadPool(5) as p:
returned_dataframes = p.map(FetchHistoryData, idlist, chunksize = 1)
WriteHistoryData(returned_dataframes)
Once we have our idlist, these values get passed to the FetchHistoryData function via a pool of 5 threads. The idea behind setting a chunksize of 1 is that we can better ensure that all five of our threads are always populated with work to do and that no single thread presents a bottleneck. Jason at SuperFastPython has a great article on benchmarking different chunksize values here. The improvement for our use case is probably marginal but might be worth evaluating for other projects.
Once the worker pool is finished generating the returned_dataframes list, the WriteHistoryData function performs database write operations which finishes the script.
Great; let’s take a look at the FetchHistoryData function next; descriptive comments are again outlined in bold:
def FetchHistoryData(type_id):
url = 'https://esi.evetech.net/latest/markets/10000002/history/?datasource=tranquility&type_id=' + str(type_id)
# Start timer and attempt counter. Since this endpoint is rate-limited, we need to keep track of execution time and sleep between API calls if necessary to operate within the limit
CycleStartTime = time.time()
attempts = int(0)
# If we fail to acquire any data, we will pass this empty dataframe back to the main thread since it can be presumed that the given type_id has no market history.
TypeIDHistory = pd.DataFrame()
# We will make a maximum of 2 attempts before giving up
while attempts < 2:
try:
r = requests.get(url, headers=headers)
except:
# If we get a connection closed or other error, we will increase our attempt counter by 1, wait if needed, and return to the while loop
print(f'fetching historical data for {type_id} failed due to connection/network issue')
attempts = attempts + 1
CycleStartTime = RateLimitFunction(CycleStartTime)
else:
if r.status_code == 200:
# if a connection succeeded, we will save the result to a dataframe and escape the while loop. Succeeded connections can include completely empty dataframes which indicate that an item has experienced no trade in recent history.
TypeIDHistory = pd.DataFrame(r.json())
attempts = 2
else:
# if we get anything other than a 200 status, we will increase our attempt counter by 1, wait if needed, and return to the while loop
print(f'fetching historical data for {type_id} failed due to {r.status_code}')
attempts = attempts + 1
CycleStartTime = RateLimitFunction(CycleStartTime)
# if no data could be gathered in 2 attempts, our empty dataframe will be passed to the main thread
# append type_id to the dataframe as a new column.
TypeIDHistory['type_id'] = type_id
# wait if needed before passing TypeIDHistory to the main thread
RateLimitFunction(CycleStartTime)
return(TypeIDHistory)
This function performs all of our API calls while also providing exception handling with a retry counter. If data could be successfully acquired, this will be passed back to the main thread and added to a list of pandas dataframes.
This function frequently calls a rudimentary rate limit function:
def RateLimitFunction(CycleStartTime):
if 1.1 > (time.time() - CycleStartTime):
time.sleep(abs(1.1 - (time.time() - CycleStartTime)))
return(time.time())
Since the /history rate limit is 300 calls per minute, and since we have 5 threads running concurrently, FetchHistoryData should only be allowed to call the API as frequently as once per second. The RateLimitFunction enforces this limit by comparing CycleStartTime against the current time and waiting-out the remaining duration, returning the current time as a response which is used to reset CycleStartTime. This ensures that each thread running FetchHistoryData cannot make an API call any faster than once every 1.1 seconds. The 0.1-second buffer keeps us well outside of the limit at all times while still ensuring a call rate near the maximum allowed.
This rudimentary method of rate-limiting is far from perfect; our method loses time whenever a single call takes more than 1.1 seconds to execute. While there are more sophisticated rate-limiting techniques available which typically involve importing additional modules, the limitations presented by this method are minor and do not substantially increase execution time far beyond the theoretical minimum.
Finally, once FetchHistoryData is finished fetching data in its ThreadPool, WriteHistoryData writes the resulting data to our database:
def WriteHistoryData(returned_dataframes):
#concatenate our list of dataframes and write them to our database
dataframe_concat = pd.concat(returned_dataframes)
dataframe_concat['timestamp'] = unixtime
dataframe_concat.to_sql('TheForgeHistory', marketdatadb, if_exists='replace', index=False)
#build index
cur.executescript(f'''CREATE INDEX IF NOT EXISTS TheForgeHistoryIndex ON TheForgeHistory(timestamp, type_id)''')
print('Historical market data processing completed')
With our functions and runtime all tied together, our total execution time sits at about 65 minutes…
...
fetching historical data for 78637 failed due to 404
fetching historical data for 78666 failed due to 404
fetching historical data for 78640 failed due to 404
fetching historical data for 78640 failed due to 404
fetching historical data for 78637 failed due to 404
fetching historical data for 78666 failed due to 404
Historical market data processing completed
… with approximately 165MB of data written to our database:

Out of over 16,000 items, about a dozen were omitted, reflecting a ~0.1% failure rate. This is perfectly acceptable for our use case and we would be hard-pressed to do any better.
With /orders and /history completed, let’s move-on to our static data:
Callling InvTypes
FuzzySteve maintains a useful repository of SDE downloads for EvE Online. For our purposes, we will use ‘InvTypes’ which includes the English name for all type_ids in the game, as well as other useful attributes which we will use in our future reporting.
Since this data is all contained in a single .csv file, this can be fetched and added to our database without much fuss within a simple function, executed after everything else in our runtime:
...
def FetchInvTypes():
try:
url = 'https://www.fuzzwork.co.uk/dump/latest/invTypes.csv'
df = pd.read_csv(url)
df['timestamp'] = unixtime
df.to_sql('InvTypes', marketdatadb, if_exists='replace', index=False)
cur.executescript(f'''CREATE INDEX IF NOT EXISTS InvTypesIndex ON InvTypes(timestamp, typeID)''')
print('InvTypes collected successfully')
except:
print('Fetching InvTypes failed; skipping')
...
##runtime
...
FetchInvTypes()
Since the URL provides a single file containing everything we need, our fetching mechanism can be simple. Additionally, recency for this endpoint isn’t all that important; these only update anytime the game developers introduce new object types, which at the time of this writing hasn’t occurred in over three months. As long as our database already has an old copy, it isn’t a big deal if this function fails, so we will use lightweight exception handling. We can limit ourselves to calling this function maybe once per week to spare FuzzySteve their bandwidth.
With fetching methods successfully developed for each of our data sources, we now need to set-up a scheduling mechanism:
Scheduling
As things are currently written, all of our runtime elements will run each time we execute our script. This needs to be changed, since the granularity we desire for each data source is different:
- For /orders, we need to collect data once every hour, with the option to choose a faster or slower interval in the future if desired.
- For /history, we need to collect data once every day
- For InvTypes, once every week
The simplest solution would be to split each of these routines into their own separate scripts, each with their own system-level execution schedule defined via cron, however since we are using SQLite as our database which does not support write concurrency, this approach would be difficult to implement without changing our database type. While switching database types might be a fun exercise, we’re going to stick with SQLite for this exercise.
Instead, we are going to implement logic within the script to govern when each portion of our runtime is allowed to execute based off of existing or absent database values; fortunately, we always append timestamp values to every record in our database derived from the ‘unixtime’ variable, generated each time our script is executed. So, by comparing ‘unixtime’ from a currently-running script with the newest timestamps present in each of the tables in the database, we can validate the last time a data fetching routine occurred and use this information to decide if specific data collection functions should be executed.
We’ll start by writing a function that returns the largest timestamp present in a given table.
...
def ValidateMaxTimestamp(table):
try:
MaxTimestamp = pd.read_sql(f'SELECT max(timestamp) AS timestamp FROM {table}', marketdatadb)
MaxTimestamp = int(MaxTimestamp['timestamp'].values[0])
except:
MaxTimestamp = int(0)
return(MaxTimestamp)
...
An exception occurring here would indicate that there is no data, causing the function to simply return an integer of 0.
This function can then be called at the start of our runtime to fetch the largest timestamps corresponding to each of our three data sources:
#runtime
if __name__ == '__main__':
OrderAge = ValidateMaxTimestamp('TheForge')
HistoryAge = ValidateMaxTimestamp('TheForgeHistory')
InvTypesAge = ValidateMaxTimestamp('InvTypes')
...
Which we can then use to dictate execution throughout the rest of our runtime:
...
# execute FetchOrderData if current order data is at least an hour old
if unixtime >= OrderAge + 3600:
for region in regionlist:
regionstatus = 'incomplete'
while regionstatus == 'incomplete':
regionstatus = FetchOrderData(region)
else:
print('Existing order data is current; skipping order data fetch')
# execute FetchHistoryData if current history data is at least a day old AND the cache isn't about to expire
currentUTC = str(datetime.datetime.now(datetime.timezone.utc).strftime("%H%M"))
if unixtime >= HistoryAge + 86400 and (int(currentUTC) > 1110 or 900 > int(currentUTC)):
pdidlist = pd.read_sql('SELECT DISTINCT type_id FROM TheForge ORDER BY type_id ASC;', marketdatadb)
idlist = pdidlist['type_id'].tolist()
with ThreadPool(5) as p:
returned_dataframes = p.map(FetchHistoryData, idlist, chunksize = 1)
WriteHistoryData(returned_dataframes)
else:
print('Existing history data is current; skipping history data fetch')
# execute FetchInvTypes if current InvTypes data is at least a week old
if unixtime >= InvTypesAge + 604800:
FetchInvTypes()
else:
print('Existing InvTypes data is current; skipping InvTypes data fetch')
As a result of this change, no matter how many times we execute our script, new data will only be collected if the age of our existing data warrants a refresh. We have also added a condition which prevents FetchHistoryData from executing if the cache for that endpoint is close to refreshing. Unlike the /orders endpoint, we can forego checking dates in headers since the /history endpoint refreshes at a consistent time every day (1105 UTC).
Unfortunately, this rudimentary scheduling logic reveals a limitation which undermines one of our project goals; since /history takes over an hour to execute, and since only one instance of our script will run at a time, our routine for collecting data from /orders will stall and fall behind whenever data from /history is currently being downloaded. While this is a pretty minor problem if we only desire one hour of granularity from /orders, since we want to be able to improve granularity in the future, our current scheduling logic is deficient since it will result in large gaps in our /orders tables any time /history needs to be refreshed. Since we cannot change the API rate limit for /history, we have to change our script.
Off the top of my head, there are two different approaches which could be taken to overcome this limitation:
- We can fetch data from /history incrementally in smaller chunks; if we could collect data from as many as 1000 type_ids at a time, the underlying functions would only take 5 minutes to execute, eliminating this bottleneck for /orders.
- We can incorporate more multithreading and queueing techniques, allowing /orders and /history data fetching routines to occur concurrently
After some initial testing, I discovered that while the former approach could potentially work, going this route would require extensive modifications to our existing functions, including adding more complicated database read/write operations. This would also result in mixed recency; some type_ids would have newer history written to the database than others, which could make generating reports down the line more troublesome.
On the other hand, multi-threading could be implemented by putting operations for /orders, /history and /InvTypes into their own dedicated threads, allowing them to all operate independently with minimal modifications. However, we would need to add new code in-order to manage additional threads and to prevent race conditions. We will also need to break-up some functions into smaller pieces in-order to serialize database writing operations to account for SQLite’s aforementioned inability to handle concurrent writes.
Since we know that our existing functions all independently operate to our standards, and since I prefer to avoid modifying things that already work well, we’ll take the multi-threading path.
Refactoring Attempt #2:
First, in-addition to purging our initial database connection and cursor commands from the very beginning of our script, we are making some additions:
# modules
...
from queue import Queue
import threading
from threading import Thread
# specify the interval we wish to use for collecting order data (in seconds, minimum 300)
OrderDataInterval = int(3600)
These modules provide multi-threading and queuing functionality beyond what we have already used with ThreadPools. We are also defining a global variable OrderDataInterval which can be adjusted and used to dictate how frequently data from /orders needs to be fetched.
Now we’ll move-on to our runtime which has substantially changed. Descriptive comments are highlighted in bold:
### runtime
if __name__ == '__main__':
# create database file if it does not already exist, enable write-ahead logging for read/write concurrency
marketdatadb = sqlite3.connect("marketdata.sqlite")
marketdatadb.execute('pragma journal_mode=wal')
marketdatadb.close()
# connect to database (READ-ONLY)
marketdatadb = sqlite3.connect("file:marketdata.sqlite?mode=ro", uri=True)
cur = marketdatadb.cursor()
# pull largest timestamps from our database corresponding with each data source
OrderAge = FetchMaxTimestamp('TheForge')
HistoryAge = FetchMaxTimestamp('TheForgeHistory')
InvTypesAge = FetchMaxTimestamp('InvTypes')
# start our queue and daemon for fetching market order data
FetchOrderDataQueue = Queue(maxsize = 0)
threading.Thread(target=FetchOrderDataInvoke, daemon=True).start()
# start our queue and daemon for fetching market history data
FetchHistoryDataQueue = Queue(maxsize = 0)
threading.Thread(target=FetchHistoryDataInvoke, daemon=True).start()
# start our queues and daemon for writing any fetched data to the database
HistoryDataWriteQueue = Queue(maxsize = 0)
OrderDataWriteQueue = Queue(maxsize = 0)
InvTypesDataWriteQueue = Queue(maxsize = 0)
threading.Thread(target=DataWriter, daemon=True).start()
# fetch data from the /orders endpoint for all regions if data is out-of-date. Continue progress only after any fetched data has been written to the database
FetchOrderDataQueue.put(regionlist)
FetchOrderDataQueue.join()
OrderDataWriteQueue.join()
# fetch data from the /history endpoint for TheForge if data is out-of-date. If triggered, an idlist will be generated and queued
currentUTC = str(datetime.datetime.now(datetime.timezone.utc).strftime("%H%M"))
if unixtime >= HistoryAge + 86400 and (int(currentUTC) > 1110 or 900 > int(currentUTC)):
print('Historical market data is out-of-date; downloading latest data now from ESI')
pdidlist = pd.read_sql('SELECT DISTINCT type_id FROM TheForge ORDER BY type_id ASC;', marketdatadb)
idlist = (pdidlist['type_id'].tolist())
FetchHistoryDataQueue.put(idlist)
FetchHistoryStatus = 'incomplete'
else:
print('Existing history data is current; skipping history data fetch')
FetchHistoryStatus = 'complete'
# fetch current InvTypes data if existing InvTypes is at least a week old.
if unixtime >= InvTypesAge + 604800:
print('InvTypes is out-of-date; downloading now from ESI')
FetchInvTypes()
else:
print('Existing InvTypes data is current; skipping InvTypes data fetch')
# progress only after any /history and InvTypes data has been written to the database
FetchHistoryDataQueue.join()
HistoryDataWriteQueue.join()
InvTypesDataWriteQueue.join()
# allow any ongoing /orders data fetch and write operations to complete before terminating the script
FetchOrderDataQueue.join()
OrderDataWriteQueue.join()
Previously, our primary thread and runtime was responsible for calling functions directly in a sequential order with multi-threading only being used to perform concurrent web calls through a thread pool. Now, our primary thread has offloaded much of this responsibility to daemons which reside in separate threads; the primary role of the main thread now is conducting flow control and the prevention of race conditions via the use of queue .put() and .join() commands. For more information about the queue module, my first article on parallelization and multithreading provides a great introduction.
We’ve also taken an opportunity here to enable Write-Ahead Logging for our database which allows read concurrency even while a write operation is underway, which we will need as we build separate reporting scripts in future articles.
So, let’s walk through our runtime and see what the daemons are working-on after they’ve been spawned. After populating FetchOrderDataQueue with a .put() command, the FetchOrderDataInvoke daemon is set into motion:
## /orders Endpoint Functions/Services
# daemon
def FetchOrderDataInvoke():
while True:
# send order data age and region list to FetchOrderDataRegions function
regionlist = FetchOrderDataQueue.get()
DaemonOrderAge = OrderAge
RegionListandAge = [DaemonOrderAge, regionlist]
DaemonOrderAge = FetchOrderDataRegions(RegionListandAge)
FetchOrderDataQueue.task_done()
OrderDataWriteQueue.join()
while True:
#continue loop indefinitely, pinging FetchOrderRegions every minute until the script ends
time.sleep(60)
FetchOrderDataQueue.put([DaemonOrderAge, regionlist])
DaemonOrderAge = FetchOrderDataRegions(FetchOrderDataQueue.get())
FetchOrderDataQueue.task_done()
OrderDataWriteQueue.join()
Once FetchOrderDataQueue has been populated by our main thread, our daemon can proceed past the .get() command and execute FetchOrderDataRegions one time. FetchOrderDataQueue.task_done() signals to the queue that the fetched data has been processed, while OrderDataWriteQueue.join() waits for a signal from our DataWriter function before it can proceed. Since /history is dependent on /orders data being present in-order to acquire the requisite item_id list, we need one fetch iteration for /orders to complete before we can move-on to /history. Our main thread accomplishes this with .join() commands which prevent what would otherwise be a race condition from occurring.
After one iteration of FetchOrderDataRegions, the daemon will perpetually run within the nestled while loop which will continue to execute FetchOrderDataRegions any time a minute has passed since the most recent execution has concluded. The continued population and depopulation of the queue here allows us to again use .join() commands in our main thread in-order to prevent premature termination of our script if the /orders daemon is still in the process of fetching data after our /history fetching routine has concluded.
FetchOrderDataRegions is also a new function, derived from our old runtime, with detailed comments outlined in bold:
...
def FetchOrderDataRegions(RegionListandAge):
DaemonOrderAge = RegionListandAge[0]
regionlist = RegionListandAge[1]
# 'OrderUnixTime' provides a normalized current time, which will help us determine if our /orders data is old enough to warrant fetching new data.
OrderUnixTime = int(round(time.time() / 300) - 0.5) * 300
# if our existing /orders data is older than our OrderDataInterval, initiate a fetch routine
if OrderUnixTime >= DaemonOrderAge + OrderDataInterval:
for region in regionlist:
regionstatus = 'incomplete'
while regionstatus == 'incomplete':
regionstatus = FetchOrderData(region)
print('fetching order data for all regions completed')
# returning this value changes 'DaemonOrderAge' in the parent daemon to match the time collected at the beginning of this function. This effectively resets our fetch interval timer.
return(OrderUnixTime)
else:
print('order data is up-to-date; skipping fetch operation')
# if no data was fetched, 'DaemonOrderAge' will remain unchanged in the parent daemon.
return(DaemonOrderAge)
By increasing or decreasing the OrderDataInterval defined at the start of our script, we can decrease or increase the frequency at which the FetchOrderDataRegions function invokes FetchOrderData for all regions. The result is that we now have a simple mechanism for determining our granularity for /order data
Our other functions OrderDataCall and FetchOrderData haven’t really changed much. For the former, we have doubled our exception delay from 4 to 8 seconds to account for /history and /orders concurrency in-order to ensure the ESI error rate limit is never reached in any circumstance. In the case of FetchOrderData, we have removed all of our database write operations, which have been replaced with a put command: OrderDataWriteQueue.put(RegionAndData)
These database write operations for /orders have all been moved to a new WriteOrderData function, which is called by our new DataWriter daemon:
...
#this daemon serializes all database write operations
def DataWriter():
## write data as it appears in any of our queues
while True:
if OrderDataWriteQueue.qsize() > 0:
WriteOrderData(OrderDataWriteQueue.get())
OrderDataWriteQueue.task_done()
if HistoryDataWriteQueue.qsize() > 0:
WriteHistoryData(HistoryDataWriteQueue.get())
HistoryDataWriteQueue.task_done()
if InvTypesDataWriteQueue.qsize() > 0:
WriteInvTypesData(InvTypesDataWriteQueue.get())
InvTypesDataWriteQueue.task_done()
time.sleep(1)
def WriteOrderData(RegionAndData):
## connect to sqlite DB and initialize cursor
marketdatadb = sqlite3.connect("marketdata.sqlite")
cur = marketdatadb.cursor()
RegionAndData[1].to_sql(RegionAndData[0], marketdatadb, if_exists='append', index=False)
# purge any data more than 10 days old from the database for the reported region
cur.executescript(f'''DELETE FROM {RegionAndData[0]} WHERE timestamp < {unixtime - 864000}''')
# build index if it doesn't exist
cur.executescript(f'''CREATE INDEX IF NOT EXISTS {RegionAndData[0]}Index ON {RegionAndData[0]}(timestamp, order_id)''')
# return 'complete' string; region data collection is complete
print(f'done writing order data for region {RegionAndData[0]} to database')
def WriteHistoryData(returned_dataframes):
...
def WriteInvTypesData(df):
...
DataWriter is a daemon residing in a dedicated thread which checks every second to see if there is anything in our write queues. If anything is found, the data’s corresponding write function is executed before signaling .task_done() back to the respective queue. This function overcomes SQLite’s write concurrency limitation by ensuring that write operations to our database are sequential and will never overlap. Since writing to the database is now solely within the domain of this daemon, and since SQLite does support read concurrency, we can limit our primary thread to read-only access to the database so that it can conduct the read operations required at the start of our runtime.
WriteOrderData likewise simply opens the database and writes data in the same manner that OrderDataFetch did before we refactored our code, with WriteHistoryData and WriteInvTypesData both doing the exact same thing for /history and InvTypes respectively.
Previously we didn’t need to worry much about thread safety since all writes to our database were implicitly serialized. Now that we are dealing with disparate threads which all generate data which needs to be written to a single database, implementing this explicit write serialization provides adequate thread safety for our use-case, while also providing more granularity to our functions which should make future additions and changes easier to implement if needed. While serialized write operations might present a bottleneck in many contexts, for our use-case this isn’t a concern since we will never be able to gather data from ESI faster than it can be written to one SQLite database.
As we continue down our runtime after /orders has completed one iteration, the main thread will initiate /history data fetching. Rather than having the main thread fetch data for /history, this procedure is passed to a dedicated daemon and thread, defined by a very simple function:
# daemon
def FetchHistoryDataInvoke():
while True:
idlist = FetchHistoryDataQueue.get()
with ThreadPool(5) as p:
returned_dataframes = p.map(FetchHistoryData, idlist, chunksize = 1)
HistoryDataWriteQueue.put(returned_dataframes)
FetchHistoryDataQueue.task_done()
Beyond this, adjusting our error rate limit, and shifting database write operations to WriteHistoryData managed by DataWriter, there aren’t any other changes to note.
Since fetching data for InvTypes is performed in a single call, we don’t need a fetch queue. Besides splitting this function into separate fetch and write functions, nothing else has changed here either.
Finally, our script concludes with a series of five .join() commands, providing a final bit of flow control which prevents any of our fetch operations from being terminated prematurely. This ensures that any ongoing fetch operations are completed, and also prevents database corruption by ensuring that all write operations have time to complete.
The end-result is our completed script, which has been published here and has been slightly modified to enable logging to a text file rather than directly to the terminal. For the purposes of our upcoming articles on data analytics, the data provided by this script will serve us well, however if any subsequent changes are made I’ll be sure to post an addendum here with notes.
Conclusion
Developing tools can be a long and iterative process, and it can be difficult to ascertain when something is “good enough” to put into use. This becomes completely impossible if concrete goals are not set.
While our original ESI data collection design failed to achieve our goals, by seeing how this original design failed, we arrived at our goal through mindful refactoring. As a result of this effort, we now have a comprehensive and robust data collection script for ESI which we will leverage for EvE Online data analytics.
In our upcoming articles, we will investigate different use-cases for this data and get started developing automated report generation workflows.
Thank you for reading!
*Virtual Markets, Part 8: Station Trading is now live!
August 26 2024 Addendum
Since originally publishing this article, I’ve made a handful of revisions to the script, including bugfixes and feature updates. Let’s run through them all:
Feature Updates
First, the script now fetches historical endpoint data for all indicated regions rather than just The Forge as this data has proved invaluable in my subsequent reporting strategies. While this does result in many hours of runtime any time this data is being refreshed, this presents a non-issue following the steps taken in the second refactor.
Next, in-addition to fetching InvTypes, the script now also fetches the static data export for ‘IndustryMaterials’ from FuzzySteve, including recipe tables for in-game products which will be used when discussing price floors in an upcoming article.
Exception Handling for DataWriter Functions
In one instance while working on some reporting tools, I discovered that my local database had not been modified in a few days and was becoming stale. Upon closer inspection, the database generation script had stalled.
After reviewing logs and performing some root cause analysis, the problem originated in the WriteOrderData function where a SQLite write operation had failed to execute in a timely manner which threw an I/O exception. This resulted in the main thread sitting idle indefinitely which prevented subsequent script iterations from running, resulting in our stale database.
Now, what would prevent an otherwise functional write command from succeeding? There is adequate storage space available for our container to use and the underlying storage device is in good health. The answer lies with a totally unrelated virtual machine which resides on the same hardware as the LXC container running our script; for some unknown and inexplicable reason, when this virtual machine is powered-on it occasionally causes the entire host machine to freeze and become totally non-responsive for a few seconds, with one of these freezing events coinciding with a database write operation.
While I could investigate and attempt to resolve the misbehaving virtual machine, this scenario highlights an extremely important general aspect of pooled/shared computing, in-that we seldom have complete control over hardware. In the broader context of cloud computing where resources are shared between many disparate workloads and organizations, we can never assume that the underlying hardware will be 100% performant. While cloud providers typically provide excellent uptime and service reliability, it is assured that at some point a service will fail.
So, what can we do withn our script to address the inevitable fallibility of hardware? While addressing massive outages requires investing money and capital into multi-regional, high-availability infrastructure, we can still manage to address brief, momentary outages like those we have been experiencing by incorporating more exception handling. In much the same way that we use exception handling to retry API calls to ESI, we can apply the same techniques when attempting to write data to our database.
The following is a modified version of our WriteOrderData function which now includes some rudimentary exception handling:
def WriteOrderData(RegionAndData):
WriteOperation = 'incomplete'
while WriteOperation == 'incomplete':
try:
## connect to sqlite DB and initialize cursor
marketdatadb = sqlite3.connect("marketdata.sqlite")
cur = marketdatadb.cursor()
# purge any data more than 10 days old from the database for the reported region and create an index if the corresponding table exists
cur.execute(f'''SELECT count(*) FROM sqlite_master WHERE type='table' AND name='{RegionAndData[0]}';''')
TableExists = int(cur.fetchone()[0])
if TableExists > 0:
cur.executescript(f'''DELETE FROM {RegionAndData[0]} WHERE timestamp < {unixtime - 864000}''')
cur.executescript(f'''CREATE INDEX IF NOT EXISTS {RegionAndData[0]}Index ON {RegionAndData[0]}(timestamp, order_id)''')
# append new order data to corresponding table on database
RegionAndData[1].to_sql(RegionAndData[0], marketdatadb, if_exists='append', index=False)
# commit changes and close database
marketdatadb.commit()
marketdatadb.close()
except:
# if any exception occurs while accessing the database, sleep for 5 seconds and try again.
app_log.info(f'exception occurred while writing orderbook data for {RegionAndData[0]}; retrying in 5 seconds')
sleep(5)
else:
# if write operation succeeded, return 'complete' string.
app_log.info(f'done collecting and writing order data for region {RegionAndData[0]}')
WriteOperation = 'complete'
All of our original database operations are now contained within a try block. If any exception occurs within this block, the function will wait for 5 seconds before retrying. The encompassing while loop is only escaped after all database operations contained within the try block have successfully completed.
Furthermore, the database operations have been re-arranged slightly in-order to prevent an undesirable edge case where new data may be appended more than once to the database if an exception occurs either during index building or while purging old records; new data is now written to the database as the final step.
Similar try blocks have also been added to our other database write functions, with testing so far proving successful; the script overall functions exactly as before, but with some added resiliency against momentary hardware problems whether they occur on our own infrastructure or against an S3 bucket.
Revising Cache Refresh Interval Timer
While reviewing script logs, I noticed that there have been periods of time where the FetchOrderData function has repeatedly failed to obtain a congruous set of order data, specifically for The Forge, often resulting in dozens of retry attempts. While the operation does eventually succeed, perpetually requesting and failing to obtain a congruent set of data is neither performant or respectful toward ESI.
These incidents most frequently occur while concurrently fetching historical endpoint data. While fetching all ~360 pages of data for The Forge typically takes about 1-2 minutes to complete, these occasional fits see the data capture duration sometimes approach 4 minutes.
There are a few different ways we could approach this issue. The simplest approach would be to simply increase our concurrent connection count beyond our current value of 5, however this will result in more “spiky” traffic which is less respectful toward the API, so we will reserve this approach as a last resort.
Another option would involve deploying a mechanism which would halt historical data fetch operations anytime new order book data is being downloaded. This could be accomplished by serializing our API calls in a similar manner that we’ve used for serializing database write operations, with priority geared toward API calls toward the orderbook endpoint. While this is a much better idea than increasing our concurrent connection count, it will require some substantial changes to our code, so we will couch this idea for now.
Another approach would involve modifying the logic used within the FetchOrderData function that forces the function to wait and restart if the refresh interval is close to expiring. This small snippet currently looks like this:
...
# if the cache will refresh in less time than we might need to download the full set of data at a rate of 1 page every half second, we will wait for the cache to expire and try again.
if SecondsUntilExpiry < (0.5 * firstpage[1]):
app_log.info(f'waiting for cache to refresh in {SecondsUntilExpiry} seconds before retrying {region[0]}')
time.sleep(SecondsUntilExpiry + 1)
return('incomplete')
...
The existing logic assumes that one page can be fetched every half second. While this is typically the case, network latency is seldom constant and we should make some changes to improve timing nuance for our order data fetch operations:
...
# If the cache will refresh in less time than we might need to download the full set of data at a rate of 1 page every second, we will wait for the cache to expire and try again. This waiting threshold is capped at 295 seconds, which is just shy of the 5-minute refresh interval.
if SecondsUntilExpiry < min((firstpage[1]), 295):
app_log.info(f'waiting for cache to refresh in {SecondsUntilExpiry} seconds before retrying {region[0]}')
time.sleep(SecondsUntilExpiry + 1)
return('incomplete')
...
Following this change, the function will now restart if there are fewer seconds remaining than there are pages to fetch. On its own, this would be problematic for The Forge given that this region almost always has far more than 300 pages. To address this, the min function caps the trigger to reset at 295 seconds, or just 5 seconds shy of the refresh interval.
The result is that data fetching operations for The Forge will always occur at the start of a refresh interval where there is the maximum amount of time available to gather congruent data. While doubling the time allocated for fetch operations might cause a small delay when fetching data under normal network conditions, this is marginal when compared to the delays encountered when fetch operations for The Forge repeatedly fail.
With this change, we should see far fewer instances of incongruent data for The Forge being gathered, with a corresponding reduction in incessant retries. If issues do persist, we can return to and implement our couched idea of API call serialization.
… And that concludes our addendum for August 2024! The published copy of the script now includes these changes. If any future changes are made, they’ll be recorded here as part of a new update. Thank you for reading!