I recently had a need to issue a number of long-running HTTP gets that were taking about 20 minutes to complete when performed serially, one after the other. In spite of the concurrency limits imposed by the Global Interpreter Lock, Ruby is well-equipped to handle long-running I/O.
Using the methods documented below I was able to squash that 20 minutes down to less than 5 minutes. I probably would have been able to do better if the server was able to handle the full load of all requests concurrent.
Part of the reason I am writing this is because of the additional amount of tire-kicking I needed to do to get this working even with the the documentation that comes with em-http-request. (Thank you for posting your lovely gem... Your documentation is close but not quite there for me.)
Simple Example - Single Request
Because I prefer a number of examples with increasing complexity when I am tackling a new library, that's what you'll get here. This first example implements only a single HTTP request but establishes that we have figured out enough to determine that it is working for a single get.
require 'eventmachine' | |
require 'em-http-request' | |
require 'logger' | |
require 'pry' | |
## | |
# Single request | |
@start_time = Time.now | |
@logger = Logger.new($stdout) | |
http = nil | |
status_code = nil | |
def runtime | |
Time.now - @start_time | |
end | |
EventMachine.run { | |
url = 'http://ipv4.download.thinkbroadband.com/5MB.zip' | |
# Create HTTP Request and issue get, which returns an HTTPConnection | |
http = EventMachine::HttpRequest.new(url).get | |
@logger.debug("[#{__method__}] [url=#{url}] [SUBMITTED] [runtime=#{runtime}]") | |
# setup callbacks and errbacks to deal with normal and errored completion | |
http.callback do | |
status_code = http.response_header.status | |
@logger.debug("[#{__method__}] [url=#{url}] [CALLBACK #{status_code}] [runtime=#{runtime}]") | |
p http.response_header | |
EM.stop | |
end | |
http.errback do | |
status_code = http.response_header.status | |
@logger.debug("[#{__method__}] [url=#{url}] [ERRBACK #{status_code}] [runtime=#{runtime}]") | |
p http.response_header | |
EM.stop | |
end | |
} | |
# lauch Pry in case we want to do any REPL-y things | |
binding.pry |
It produces output that looks like this:
$ ruby bin/em-http/em-http-000.rb
D, [2016-07-24T17:12:39.386698 #42654] DEBUG -- : [] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [SUBMITTED] [runtime=0.197928]
D, [2016-07-24T17:12:42.238997 #42654] DEBUG -- : [] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [CALLBACK 200] [runtime=3.050257]
{"SERVER"=>"nginx", "DATE"=>"Sun, 24 Jul 2016 21:12:39 GMT", "CONTENT_TYPE"=>"application/zip", "CONTENT_LENGTH"=>"5242880", "LAST_MODIFIED"=>"Mon, 02 Jun 2008 15:30:42 GMT", "CONNECTION"=>"close", "ETAG"=>"\"48441222-500000\"", "ACCESS_CONTROL_ALLOW_ORIGIN"=>"*", "ACCEPT_RANGES"=>"bytes"}
So far, so good. We download from a URL and it is successful with a status code of 200.
Next Example: Crude Async Concurrency
The next example ensures that we have a solid enough understanding to deal with two concurrent requests. One thing we have to handle is ensuring that EM.stop is only called after all of the jobs have completed. So in this example, we add a request queue collection and a method call to EM.stop when all jobs are done.
require 'eventmachine' | |
require 'em-http-request' | |
require 'logger' | |
require 'pry' | |
## | |
# concurrent requests without concurrency limits | |
FIVE = 'http://ipv4.download.thinkbroadband.com/5MB.zip' | |
TEN = 'http://ipv4.download.thinkbroadband.com/10MB.zip' | |
@request_queue = [] | |
@start_time = Time.now | |
@logger = Logger.new($stdout) | |
def runtime | |
Time.now - @start_time | |
end | |
def new_http_request(url) | |
# every job gets added to the +@request_queue+ | |
request_hash = {} | |
@request_queue << request_hash | |
http = EventMachine::HttpRequest.new(url).get | |
@logger.debug("[#{__method__}] [url=#{url}] [SUBMITTED] [runtime=#{runtime}]") | |
request_hash[:http] = http | |
# create an anonymous function and bind it to both callback and errback | |
finish = lambda do |http| | |
request_hash[:response] = http.response | |
status_code = http.response_header.status | |
@logger.debug("[#{__method__}] [url=#{url}] [CALLBACK/ERRBACK #{status_code}] [runtime=#{runtime}]") | |
# instead of issuing an EM.stop after each job, | |
# ...we need to wait for all jobs in +@request_queue+ to finish before we stop | |
stop_when_all_finished | |
end | |
http.callback(&finish) | |
http.errback(&finish) | |
end | |
## | |
# Check states for all requests in +@request_queue+ and issue EM.stop if true | |
def stop_when_all_finished | |
@logger.debug("[#{__method__}] [states=#{@request_queue.map {|r| r[:http].state}}]") | |
EM.stop if @request_queue.all? {|r| r[:http].state.eql?(:finished)} | |
end | |
EventMachine.run { | |
new_http_request(FIVE) | |
new_http_request(TEN) | |
} | |
binding.pry |
It's getting a bit more complex but still quite grokkable and we can see that both requests get submitted at the same time, but the larger file takes longer to download.
$ ruby bin/em-http/em-http-010-async.rb
D, [2016-07-24T16:47:21.628277 #42542] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [SUBMITTED] [runtime=0.18803]
D, [2016-07-24T16:47:21.631735 #42542] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [SUBMITTED] [runtime=0.191508]
D, [2016-07-24T16:47:26.295605 #42542] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [CALLBACK/ERRBACK 200] [runtime=4.855387]
D, [2016-07-24T16:47:26.295678 #42542] DEBUG -- : [stop_when_all_finished] [states=[:finished, :body]]
D, [2016-07-24T16:47:30.669914 #42542] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [CALLBACK/ERRBACK 200] [runtime=9.229693]
D, [2016-07-24T16:47:30.669999 #42542] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished]]
Final Example: Async Concurrency with Concurrency Limits
If we have about 20 requests, we probably don't want them all going to a server and blowing it up. EventMachine provides an elegant mechanism to apply a concurrency constraint so that we can limit the number of active requests. In this final example, I issue six requests and I apply a concurrency limit of 2.
require 'eventmachine' | |
require 'em-http-request' | |
require 'logger' | |
require 'pry' | |
## | |
# concurrent requests with EM::Iterator concurrency limit of 2 | |
FIVE = 'http://ipv4.download.thinkbroadband.com/5MB.zip' | |
TEN = 'http://ipv4.download.thinkbroadband.com/10MB.zip' | |
@request_queue = [] | |
@start_time = Time.now | |
@logger = Logger.new($stdout) | |
def runtime | |
Time.now - @start_time | |
end | |
def new_http_request(request_hash, &block) | |
@request_queue << request_hash | |
url = request_hash[:url] | |
# Submit HTTP Request | |
http = EventMachine::HttpRequest.new(url).get | |
@logger.debug("[#{__method__}] [url=#{url}] [SUBMITTED] [runtime=#{runtime}]") | |
request_hash[:http] = http | |
# Bind Callback/Errback to this anonymou7s function | |
finish = lambda do |this_http| | |
request_hash[:response] = this_http.response | |
request_hash[:response_code] = this_http.response_header.status | |
@logger.debug("[#{__method__}] [url=#{url}] [CALLBACK/ERRBACK #{request_hash[:response_code]}] [runtime=#{runtime}]") | |
# yield to the block, which will start the next job | |
yield if block_given? | |
# check to see if we can EM.stop | |
stop_when_all_finished | |
end | |
http.callback(&finish) | |
http.errback(&finish) | |
end | |
## | |
# Check states for all requests in +@request_queue+ and issue EM.stop if true | |
def stop_when_all_finished | |
@logger.debug("[#{__method__}] [states=#{@request_queue.map {|r| r[:http].state}}]") | |
EM.stop if @request_queue.all? {|r| r[:http].state.eql?(:finished)} | |
end | |
## --- MAIN | |
## These are the requests we will submit - six in total | |
my_requests = [ | |
{url: FIVE}, | |
{url: TEN}, | |
{url: FIVE}, | |
{url: TEN}, | |
{url: FIVE}, | |
{url: TEN}, | |
] | |
concurrency_limit = 2 | |
EventMachine.run { | |
# use EM::Iterator to submit EM::HTTPRequests | |
EM::Iterator.new(my_requests, concurrency_limit).each do |request_hash,iter| | |
new_http_request(request_hash) { iter.next } | |
end | |
} | |
binding.pry |
The code itself doesn't look very different. But you can watch the log and see that two jobs are submitted initially and then subsuquent jobs are added only as a job completes.
$ ruby bin/em-http/em-http-020-async-with-em-iterator.rb
D, [2016-07-24T16:55:37.620212 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [SUBMITTED] [runtime=0.174998]
D, [2016-07-24T16:55:37.624177 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [SUBMITTED] [runtime=0.178991]
D, [2016-07-24T16:55:42.003892 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [CALLBACK/ERRBACK 200] [runtime=4.5587]
D, [2016-07-24T16:55:42.003984 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :body]]
D, [2016-07-24T16:55:42.006724 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [SUBMITTED] [runtime=4.561536]
D, [2016-07-24T16:55:45.318885 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [CALLBACK/ERRBACK 200] [runtime=7.873685]
D, [2016-07-24T16:55:45.319011 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished, :body]]
D, [2016-07-24T16:55:45.321175 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [SUBMITTED] [runtime=7.875988]
D, [2016-07-24T16:55:46.646499 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [CALLBACK/ERRBACK 200] [runtime=9.201303]
D, [2016-07-24T16:55:46.646617 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished, :finished, :body]]
D, [2016-07-24T16:55:46.651357 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [SUBMITTED] [runtime=9.206166]
D, [2016-07-24T16:55:50.993760 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/5MB.zip] [CALLBACK/ERRBACK 200] [runtime=13.548572]
D, [2016-07-24T16:55:50.993836 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished, :finished, :body, :finished]]
D, [2016-07-24T16:55:50.995672 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [SUBMITTED] [runtime=13.550481]
D, [2016-07-24T16:55:52.958938 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [CALLBACK/ERRBACK 200] [runtime=15.51375]
D, [2016-07-24T16:55:52.959023 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished, :finished, :finished, :finished, :body]]
D, [2016-07-24T16:55:59.708914 #42569] DEBUG -- : [new_http_request] [url=http://ipv4.download.thinkbroadband.com/10MB.zip] [CALLBACK/ERRBACK 200] [runtime=22.263717]
D, [2016-07-24T16:55:59.709023 #42569] DEBUG -- : [stop_when_all_finished] [states=[:finished, :finished, :finished, :finished, :finished, :finished]]
Conclusion
Thus ends my tutorial on using em-http-request and em-iterator to handle a number of long-running downloads with concurrency. A lot of the lines I have above are devoted to logging and comments. The code is actually quite concise, I think.
If this was helpful to you, share it along.