Ruby Asynchronous HTTP and EM::Iterator

I recently had a need to issue a number of long-running HTTP gets that were taking about 20 minutes to complete when performed serially, one after the other.  In spite of the concurrency limits imposed by the Global Interpreter Lock, Ruby is well-equipped to handle long-running I/O.  

Using the methods documented below I was able to squash that 20 minutes down to less than 5 minutes.  I probably would have been able to do better if the server was able to handle the full load of all requests concurrent.

Part of the reason I am writing this is because of the additional amount of tire-kicking I needed to do to get this working even with the the documentation that comes with em-http-request.  (Thank you for posting your lovely gem... Your documentation is close but not quite there for me.)  

Simple Example - Single Request

Because I prefer a number of examples with increasing complexity when I am tackling a new library, that's what you'll get here. This first example implements only a single HTTP request but establishes that we have figured out enough to determine that it is working for a single get.

require 'eventmachine'
require 'em-http-request'
require 'logger'
require 'pry'
##
# Single request
@start_time = Time.now
@logger = Logger.new($stdout)
http = nil
status_code = nil
def runtime
Time.now - @start_time
end
EventMachine.run {
url = 'http://ipv4.download.thinkbroadband.com/5MB.zip'
# Create HTTP Request and issue get, which returns an HTTPConnection
http = EventMachine::HttpRequest.new(url).get
@logger.debug("[#{__method__}] [url=#{url}] [SUBMITTED] [runtime=#{runtime}]")
# setup callbacks and errbacks to deal with normal and errored completion
http.callback do
status_code = http.response_header.status
@logger.debug("[#{__method__}] [url=#{url}] [CALLBACK #{status_code}] [runtime=#{runtime}]")
p http.response_header
EM.stop
end
http.errback do
status_code = http.response_header.status
@logger.debug("[#{__method__}] [url=#{url}] [ERRBACK #{status_code}] [runtime=#{runtime}]")
p http.response_header
EM.stop
end
}
# lauch Pry in case we want to do any REPL-y things
binding.pry
view raw em-http-000.rb hosted with ❤ by GitHub

It produces output that looks like this:

$ ruby bin/em-http/em-http-000.rb 
D, [2016-07-24T17:12:39.386698 #42654] DEBUG -- : [] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [SUBMITTED] [runtime=0.197928]
D, [2016-07-24T17:12:42.238997 #42654] DEBUG -- : [] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [CALLBACK 200] [runtime=3.050257]
{"SERVER"=>"nginx", "DATE"=>"Sun, 24 Jul 2016 21:12:39 GMT", "CONTENT_TYPE"=>"application/zip", "CONTENT_LENGTH"=>"5242880", "LAST_MODIFIED"=>"Mon, 02 Jun 2008 15:30:42 GMT", "CONNECTION"=>"close", "ETAG"=>"\"48441222-500000\"", "ACCESS_CONTROL_ALLOW_ORIGIN"=>"*", "ACCEPT_RANGES"=>"bytes"}

So far, so good. We download from a URL and it is successful with a status code of 200.

Next Example: Crude Async Concurrency

The next example ensures that we have a solid enough understanding to deal with two concurrent requests. One thing we have to handle is ensuring that EM.stop is only called after all of the jobs have completed. So in this example, we add a request queue collection and a method call to EM.stop when all jobs are done.

require 'eventmachine'
require 'em-http-request'
require 'logger'
require 'pry'
##
# concurrent requests without concurrency limits
FIVE = 'http://ipv4.download.thinkbroadband.com/5MB.zip'
TEN = 'http://ipv4.download.thinkbroadband.com/10MB.zip'
@request_queue = []
@start_time = Time.now
@logger = Logger.new($stdout)
def runtime
Time.now - @start_time
end
def new_http_request(url)
# every job gets added to the +@request_queue+
request_hash = {}
@request_queue << request_hash
http = EventMachine::HttpRequest.new(url).get
@logger.debug("[#{__method__}] [url=#{url}] [SUBMITTED] [runtime=#{runtime}]")
request_hash[:http] = http
# create an anonymous function and bind it to both callback and errback
finish = lambda do |http|
request_hash[:response] = http.response
status_code = http.response_header.status
@logger.debug("[#{__method__}] [url=#{url}] [CALLBACK/ERRBACK #{status_code}] [runtime=#{runtime}]")
# instead of issuing an EM.stop after each job,
# ...we need to wait for all jobs in +@request_queue+ to finish before we stop
stop_when_all_finished
end
http.callback(&finish)
http.errback(&finish)
end
##
# Check states for all requests in +@request_queue+ and issue EM.stop if true
def stop_when_all_finished
@logger.debug("[#{__method__}] [states=#{@request_queue.map {|r| r[:http].state}}]")
EM.stop if @request_queue.all? {|r| r[:http].state.eql?(:finished)}
end
EventMachine.run {
new_http_request(FIVE)
new_http_request(TEN)
}
binding.pry

It's getting a bit more complex but still quite grokkable and we can see that both requests get submitted at the same time, but the larger file takes longer to download.

$ ruby bin/em-http/em-http-010-async.rb 
D, [2016-07-24T16:47:21.628277 #42542] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [SUBMITTED] [runtime=0.18803]
D, [2016-07-24T16:47:21.631735 #42542] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [SUBMITTED] [runtime=0.191508]
D, [2016-07-24T16:47:26.295605 #42542] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [CALLBACK/ERRBACK 200] [runtime=4.855387]
D, [2016-07-24T16:47:26.295678 #42542] DEBUG -- : [stop_when_all_finished] [states=[:finished, :body]]
D, [2016-07-24T16:47:30.669914 #42542] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [CALLBACK/ERRBACK 200] [runtime=9.229693]
D, [2016-07-24T16:47:30.669999 #42542] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished]]

Final Example: Async Concurrency with Concurrency Limits

If we have about 20 requests, we probably don't want them all going to a server and blowing it up. EventMachine provides an elegant mechanism to apply a concurrency constraint so that we can limit the number of active requests. In this final example, I issue six requests and I apply a concurrency limit of 2.

require 'eventmachine'
require 'em-http-request'
require 'logger'
require 'pry'
##
# concurrent requests with EM::Iterator concurrency limit of 2
FIVE = 'http://ipv4.download.thinkbroadband.com/5MB.zip'
TEN = 'http://ipv4.download.thinkbroadband.com/10MB.zip'
@request_queue = []
@start_time = Time.now
@logger = Logger.new($stdout)
def runtime
Time.now - @start_time
end
def new_http_request(request_hash, &block)
@request_queue << request_hash
url = request_hash[:url]
# Submit HTTP Request
http = EventMachine::HttpRequest.new(url).get
@logger.debug("[#{__method__}] [url=#{url}] [SUBMITTED] [runtime=#{runtime}]")
request_hash[:http] = http
# Bind Callback/Errback to this anonymou7s function
finish = lambda do |this_http|
request_hash[:response] = this_http.response
request_hash[:response_code] = this_http.response_header.status
@logger.debug("[#{__method__}] [url=#{url}] [CALLBACK/ERRBACK #{request_hash[:response_code]}] [runtime=#{runtime}]")
# yield to the block, which will start the next job
yield if block_given?
# check to see if we can EM.stop
stop_when_all_finished
end
http.callback(&finish)
http.errback(&finish)
end
##
# Check states for all requests in +@request_queue+ and issue EM.stop if true
def stop_when_all_finished
@logger.debug("[#{__method__}] [states=#{@request_queue.map {|r| r[:http].state}}]")
EM.stop if @request_queue.all? {|r| r[:http].state.eql?(:finished)}
end
## --- MAIN
## These are the requests we will submit - six in total
my_requests = [
{url: FIVE},
{url: TEN},
{url: FIVE},
{url: TEN},
{url: FIVE},
{url: TEN},
]
concurrency_limit = 2
EventMachine.run {
# use EM::Iterator to submit EM::HTTPRequests
EM::Iterator.new(my_requests, concurrency_limit).each do |request_hash,iter|
new_http_request(request_hash) { iter.next }
end
}
binding.pry

The code itself doesn't look very different. But you can watch the log and see that two jobs are submitted initially and then subsuquent jobs are added only as a job completes.

$ ruby bin/em-http/em-http-020-async-with-em-iterator.rb 
D, [2016-07-24T16:55:37.620212 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [SUBMITTED] [runtime=0.174998]
D, [2016-07-24T16:55:37.624177 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [SUBMITTED] [runtime=0.178991]
D, [2016-07-24T16:55:42.003892 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [CALLBACK/ERRBACK 200] [runtime=4.5587]
D, [2016-07-24T16:55:42.003984 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :body]]
D, [2016-07-24T16:55:42.006724 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [SUBMITTED] [runtime=4.561536]
D, [2016-07-24T16:55:45.318885 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [CALLBACK/ERRBACK 200] [runtime=7.873685]
D, [2016-07-24T16:55:45.319011 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished, :body]]
D, [2016-07-24T16:55:45.321175 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [SUBMITTED] [runtime=7.875988]
D, [2016-07-24T16:55:46.646499 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [CALLBACK/ERRBACK 200] [runtime=9.201303]
D, [2016-07-24T16:55:46.646617 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished, :finished, :body]]
D, [2016-07-24T16:55:46.651357 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [SUBMITTED] [runtime=9.206166]
D, [2016-07-24T16:55:50.993760 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [CALLBACK/ERRBACK 200] [runtime=13.548572]
D, [2016-07-24T16:55:50.993836 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished, :finished, :body, :finished]]
D, [2016-07-24T16:55:50.995672 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [SUBMITTED] [runtime=13.550481]
D, [2016-07-24T16:55:52.958938 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [CALLBACK/ERRBACK 200] [runtime=15.51375]
D, [2016-07-24T16:55:52.959023 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished, :finished, :finished, :finished, :body]]
D, [2016-07-24T16:55:59.708914 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [CALLBACK/ERRBACK 200] [runtime=22.263717]
D, [2016-07-24T16:55:59.709023 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished, :finished, :finished, :finished, :finished]]

Conclusion

Thus ends my tutorial on using em-http-request and em-iterator to handle a number of long-running downloads with concurrency. A lot of the lines I have above are devoted to logging and comments. The code is actually quite concise, I think.

If this was helpful to you, share it along.