0

I'm writing a Python2 program that pulls a large amount of JSON data from a remote CouchDB database. The data is indexed by timestamp: I pass a startkey and an endkey, and the database returns all rows in-between. I was able to slice the request into multiple partial requests (creating multiple param dicts, stored in threadedParamsList), then send them concurrently (using grequests) which offered a significant performance boost. However, I then need to unpack the responses and return all rows, which seems to be adding quite a bit of overhead:

#timing code omitted
returnList = (grequests.get(url=url, auth=auth, params=params) for params in threadedParamsList)
data = grequests.map(returnList)
#end of request phase
data = map(lambda response: response.json()['rows'], data)
data = reduce(lambda a, b: a + b, data)
#end of map phase
data = sorted(data, key=lambda entry: entry["key"])
#end of sort phase
#:threads: 20    rows: 3857522   time: 37.351s   [16.183 request + 17.717 map + 3.451 sort]

Ideally, I want to set up the map() and reduce() (ie. just adding to a central shared list) to also be concurrent - the order of data doesn't matter too much until the end.

I tried setting up a hook in the grequest to do the map() then append the data to a global array:

finalData = []

def unpackAndStore(response, *args, **kwargs):
    global finalData
    tic = time()
    print("Beginning unpackAndStore... [{0}]".format(strftime("%X")))
    data = response.json()['rows']
    print("Got {0} rows of data [{1}]".format(len(data), strftime("%X")))
    finalData += data
    print("Added {0} rows in {1}s [{2}]".format(len(data), time()-tic, strftime("%X")))

#elsewhere...

print("Starting request... [{0}]".format(strftime("%X")))
returnList = (grequests.get(url=url, auth=auth, params=params, hooks={'response': unpackAndStore}) for params in threadedParamsList)
grequests.map(returnList)

finalData does seem to be populated as expected, however, each hook seems to be blocking the others, executing sequentially, as I can tell from my results:

Starting threaded request... [11:12:49]
Beginning unpackAndStore... [11:12:51]
Beginning unpackAndStore... [11:12:51]
Beginning unpackAndStore... [11:12:51]
Got 73325 rows of data [11:12:53]
Added 73325 rows in 2.28699994087s [11:12:53]
Got 73353 rows of data [11:12:54]
Added 73353 rows in 3.19600009918s [11:12:54]
Got 74048 rows of data [11:12:55]
Added 74048 rows in 4.04800009727s [11:12:55] (cont...)

I'm also noticing that the response hook seems to start immediately after the request is fired (ie. not after the response is received, as I expected). I couldn't find anything in requests documentation that suggests they might be lazily-requested.

From this I figured that I needed to run each request+hook as its own process, so I tried using concurrent.futures (the python2 backport) and the requests library instead, but this seems to behave the same way:

finalData = []

def unpackAndStore(response, *args, **kwargs):
    global finalData
    tic = time()
    print("Beginning unpackAndStore... [{0}]".format(strftime("%X")))
    data = response.json()['rows']
    print("Got {0} rows of data [{1}]".format(len(data), strftime("%X")))
    finalData += data
    print("Added {0} rows in {1}s [{2}]".format(len(data), time()-tic, strftime("%X")))

def getData(params, *args, **kwargs):
    return requests.get(url=url, auth=auth, params=params, hooks={'response': unpackAndStore})

#elsewhere...

print("Starting request... [{0}]".format(strftime("%X")))
with concurrent.futures.ThreadPoolExecutor(max_workers=count) as executor:
    executor.map(getData, threadedParamsList)
Starting threaded request... [11:20:18]
Beginning unpackAndStore... [11:20:21]
Beginning unpackAndStore... [11:20:21]
Beginning unpackAndStore... [11:20:21]
Got 639227 rows of data [11:20:43]
Added 639227 rows in 22.4889998436s [11:20:43]
Got 691855 rows of data [11:20:54]
Added 691855 rows in 33.6400001049s [11:20:55]
Got 801441 rows of data [11:21:11]
Added 801441 rows in 50.9379999638s [11:21:12]

I've also tried:

  • Setting up both getData() and unpackAndStore() to be created and returned through factories, then passing the executor to unpackAndStore() to create a thread
  • Using concurrent.futures.as_completed() to unpack the data as responses arrive
  • Using the same method to create a second thread (and doubling the number of max_workers) that runs unpackAndStore()

All three of these had the same result - the operations in unpackAndStore() seem to be blocking the rest of the threads from executing.

Is there any way to parse data and piece it back into an array concurrently, or is this just an innately blocking operation in Python(2)? As far as I can tell the requests can be sent out concurrently; I don't believe there's anything specific to CouchDB that would be slowing concurrent requests down.

0