Adding Concurrency to Our Erlang Program
In our last thrilling installment, we used Erlang to fetch a book's title and sales rank from Amazon. Now let's extend this to fetch the data for multiple books, first one-at-a-time, and then in parallel.
A word from our lawyers: read your Amazon Terms of Service before trying this code. You may have limitations on the number of requests you can send per second or somesuch. This code is just to illustrate some Erlang constructs.
Let's start with the function that fetches sales ranks in series. We'll pass it a list of ISBNs, and it will return a corresponding list of {title, rank} tuples. For convenience, let's define a function that returns a list of ISBNS to check. (Later, we can change this function to read from a database or a file). We're editing our file ranks.erl.
isbns() ->
[ "020161622X", "0974514004", "0974514012", "0974514020", "0974514039",
"0974514055", "0974514063", "0974514071", "097669400X", "0974514047",
"0976694018", "0976694026", "0976694042", "0976694085", "097451408X",
"0976694077", "0977616606", "0976694093", "0977616665", "0976694069",
"0976694050", "0977616649", "0977616657"
].
Users of our code call the fetch_in_series function. It uses the built-in lists:map function to convert our ISBN list into the result list.
fetch_in_series() ->
lists:map(fun fetch_title_and_rank/1, isbns()).
The first parameter to lists:map is the function to be applied to each element in the ISBN list. Here we're telling Erlang to call the fetch_title_and_rank function that we defined in the first article. The /1 says to use the version of the function that takes a single argument.
We need to export this function from the module before we can call it.
-export([fetch_title_and_rank/1, fetch_in_series/0]).
Let's fire up the Erlang shell and try it.
1> c(ranks).
{ok,ranks}
2> ranks:fetch_in_series().
[{"The Pragmatic Programmer: From Journeyman to Master","4864"},
{"Pragmatic Version Control Using CVS","288118"},
{"Pragmatic Unit Testing in Java with JUnit","116011"},
. . .
“But wait!” I hear you cry. “Isn't Erlang supposed to be good at parallel programming. What's with all this fetch-the-results-in-series” business?”
OK, let's create a parallel version. We have lots of options here, but for now let's do it the hard way. We'll spawn a separate (Erlang) process to handle each request, and we'll write all our own process management code to coordinate harvesting the results as these processes complete.
Erlang processes are lightweight abstractions; they are not the same as the processes your operating system provides. Unlike your operating system, Erlang is happy dealing with thousands of concurrent processes. What's more, you can distribute these processes across servers, processors, and cores within a processor. All this is achieved with a handful of basic operations. To send a tuple to a process whose process ID is in the variable Fred, you can use:
Fred ! {status, ok}
To receive a message from another process, use a receive stanza:
receive
{ status, StatusCode } -> StatusCode
end
There's something cool about the receive code. See the line
{ status, StatusCode } -> StatusCode
The stuff on the left hand side of the arrow is a pattern match. It means that this receive code will only accept messages which are a two element tuple where the first element is the atom status. This particular receive stanza then strips out just the actual code part of this tuple and returns it. In general a receive stanza can contain multiple patterns, each with its own specialized processing. This is remarkably powerful.
There's one last primitive we need. The function spawn takes a function and a set of parameters and invokes that function in a new Erlang process. It returns the process ID for that subprocess.
Let's write a simple, and stupid, example. This module exports test/1. This function spawns a new process that simply doubles a value. Here's the code—we'll dig into it in a second.
-module(parallel).
-export([test/1]).
test(Number) ->
MyPID = self(),
spawn(fun() -> double(MyPID, Number) end),
receive
{ answer, Val } -> { "Child process said", Val }
end.
double(Parent, Number) ->
Result = Number + Number,
Parent ! { answer, Result }.
Because we're handling all the details ourselves, we have to tell the process running the double function where to send its result. To do that, we pass it two parameters: the first is the process ID of the process to receive the result, and the second is the value to double. The second line of the double function uses the ! operator to send the result back to the original process.
The original process has to pass its own process ID to the double method. It uses the built-in self() function to get this PID. Then, on the second line of the function, it invokes spawn, passing it an anonymous function which in turn invokes double. There's a wee catch here: it's tempting to write:
test(Number) ->
spawn(fun() -> double(self(), Number) end),
...
This won't work: because the anonymous function only gets executed once the new process is started, the value returned by self() will be that process's ID, and not that of the parent.
Anyway, we can invoke this code using the Erlang shell:
1> c(parallel).
{ok,parallel}
2> parallel:test(22).
{"Child process said",44}
Back to Amazon. We want to fetch all the sales ranks in parallel. We'll start with the top-level function. It starts all the fetcher processes running in parallel, then gathers up their results.
fetch_in_parallel() ->
inets:start(),
set_queries_running(isbns()),
gather_results(isbns()).
The first line of this method is a slight optimization. The HTTP client code that we use to fetch the results actually runs behind the scenes in its own process. By calling the inets:start method, we precreate this server process.
Here's the code that creates the processes to fetch the sales data:
set_queries_running(ISBNS) ->
lists:foreach(fun background_fetch/1, ISBNS).
background_fetch(ISBN) ->
ParentPID = self(),
spawn(fun() ->
ParentPID ! { ok, fetch_title_and_rank(ISBN) }
end).
The lists:foreach function invokes its first argument on each element of its second argument. In this case, it invokes the background_fetch function that in term calls spawn to start the subprocess. Perhaps surprising is the fact that the anonymous function acts as a closure: the values of ParentID and ISBN in its calling scope are available inside the fun's body, even though it is running is a separate process. Cool stuff, eh?
There's an implicit decision in the design of this code: I decided that I don't care what order the results are listed in the returned list—I simply want a list that contains one title/rank tuple for each ISBN. I can make use of this fact in the function that gathers the results. It again uses lists:map, but bends its meaning somewhat. Normally the map function maps an value onto some other corresponding value. Here we're simply using it to populate a list with the same number of entries as the original ISBN list. Each entry contains a result from Amazon, but it won't be the result that corresponds to the ISBN that is in the corresponding position in the ISBN list. For my purposes, this is fine.
gather_results(ISBNS) ->
lists:map(fun gather_a_result/1, ISBNS).
gather_a_result(_) ->
receive
{ ok, Anything } -> Anything
end.
Let's run this:
1> c(ranks).
{ok,ranks}
2> ranks:fetch_in_parallel().
[{"The Pragmatic Programmer: From Journeyman to Master","6359"},
{"Pragmatic Version Control Using CVS","299260"},
{"Pragmatic Unit Testing in Java with JUnit","131616"},
. . .
What kind of speedup does this give us? We can use the built-in timer:tc function to call our two methods.
timer:tc(ranks, fetch_in_parallel, []).
{1163694, . . .
timer:tc(ranks, fetch_in_series, []).
{3070261, . . .
The first element of the result in the wallclock time taken to run the function (in microseconds). The parallel version is roughly 3 times faster than the serial function.
So, do you have to go to all this trouble to make your Erlang code run in parallel? Not really. I just wanted to show some of the nuts and bolts. In reality, you'd probably use a library method, such as the pmap function that Joe wrote for the Programming Erlang book. Armed with this, you could turn the serial fetch program to a parallel fetch program by changing
lists:map(fun fetch_title_and_rank/1, ?ISBNS).
to
lib_misc:pmap(fun fetch_title_and_rank/1, ?ISBNS).
Now that's an abstraction!
Anyway, here's the current version of the ranks.erl program, containing both the serial and parallel fetch functions.
-module(ranks).
-export([fetch_title_and_rank/1, fetch_in_series/0, fetch_in_parallel/0]).
-include_lib("xmerl/include/xmerl.hrl").
-define(BASE_URL,
"http://webservices.amazon.com/onca/xml?Service=AWSECommerceService"
"&SubscriptionId=<your ID goes here>"
"&Operation=ItemLookup"
"&ResponseGroup=SalesRank,Small"
"&ItemId=").
isbns() ->
[ "020161622X", "0974514004", "0974514012", "0974514020", "0974514039",
"0974514055", "0974514063", "0974514071", "097669400X", "0974514047",
"0976694018", "0976694026", "0976694042", "0976694085", "097451408X",
"0976694077", "0977616606", "0976694093", "0977616665", "0976694069",
"0976694050", "0977616649", "0977616657"
].
fetch_title_and_rank(ISBN) ->
URL = amazon_url_for(ISBN),
{ ok, {_Status, _Headers, Body }} = http:request(URL),
{ Xml, _Rest } = xmerl_scan:string(Body),
[ #xmlText{value=Rank} ] = xmerl_xpath:string("//SalesRank/text()", Xml),
[ #xmlText{value=Title} ] = xmerl_xpath:string("//Title/text()", Xml),
{ Title, Rank }.
amazon_url_for(ISBN) ->
?BASE_URL ++ ISBN.
fetch_in_series() ->
lists:map(fun fetch_title_and_rank/1, isbns()).
fetch_in_parallel() ->
inets:start(),
set_queries_running(isbns()),
gather_results(isbns()).
set_queries_running(ISBNS) ->
lists:foreach(fun background_fetch/1, ISBNS).
background_fetch(ISBN) ->
ParentPID = self(),
spawn(fun() ->
ParentPID ! { ok, fetch_title_and_rank(ISBN) }
end).
gather_results(ISBNS) ->
lists:map(fun(_) ->
receive
{ ok, Anything } -> Anything
end
end, ISBNS).




I must confess that I don't understand the parallel code :(
Where are the processes stored? It seems to me that because you are using lists:foreach, you don't care about the return value of background_fetch/1, which as I understand it is the actual process object (as created by spawn/1). So where does gather_results/1 get its processes from? From a process ether? Also, why do you need to pass the result of isbns/0 to gather_results/1 since you don't use those values in lists:map?
Any clarification would be greatly appreciated!
Vincent.
Posted by: Vincent Foley | April 17, 2007 at 02:18 PM
Vincent, note that the messages are being sent to the ParentPID. Erlang queues this messages until that ParentPID enters a receive block and matches said messages.
- ferdy
Posted by: Fernando J. Pereda | April 17, 2007 at 03:00 PM
These last two articles have got me interested in getting familiar with Erlang. I would like to know how the performance compares to other languages such as Ruby, Python and Lisp. What is the underlying implementation (interpreter, JIT compiler or what)?
Posted by: Jo Stockley | April 17, 2007 at 07:21 PM
Lazy erlang programmers might also do something like:
fetch_in_parallel() -> rpc:pmap({ranks, fetch_title_and_rank}, [], isbns()).
Though it doesn't illustrate spawning and message sending nearly as well, it spawns the processes round-robin fashion over all the connected nodes() :)
Posted by: Geoff Cant | April 17, 2007 at 11:36 PM
Jo, in general Erlang is faster than Python and Ruby, but slower than modern Lisp systems. Erlang uses both byte-code interpreter and optimizing native code compilation techniqus, you can mix both modes in a single system.
On some tasks (such as parsing binary data) Erlang performance can be comparable with C. But it doesn't actually matter, because performance of real complex system depends on a large set of factors, and these factors are not related to raw performance directly. For example, Erlang web server YAWS outperforms Apache - you can see the comparison here http://www.sics.se/~joe/apachevsyaws.html
Posted by: Vlad Balin | April 18, 2007 at 03:39 AM
Yeah, and at the same time - Erlang is VERY inefficient in handling strings represented "in this way", because internal representation of such strings is a list of integers. So, your string takes 8 times more memory being compared to normal character strings in other langugaes.
What we can do about that? Need performance - use binaries for manipulation with strings, and everything will be ok. :)
Posted by: Vlad Balin | April 18, 2007 at 03:46 AM
Jo,
Erlang is typically _much_ faster than Ruby and Python, and faster than interpreted Lisp. Compiled Lisp might be able to compete with Erlang, though. Depending on the platform, Erlang either uses a highly-optimized bytecode interpreter or an even higher-performance JIT compiler. Most common platforms will use the JIT.
Additionally, Erlang has essentially the fastest concurrency primitives of any language in existence. While it's somewhat slower at executing sequential code than a language like C, it's _much_ faster at doing synchronization. As an example, there's a web server written in Erlang, called YAWS, which kicks Apache's butt. See here: http://www.sics.se/~joe/apachevsyaws.html
For more information on comparative programming language speeds, you can look here: http://shootout.alioth.debian.org/ Keep in mind, though, that most of these benchmarks are sort of toy programs, and the performance results may not translate to more complex applications.
Posted by: Aaron Tomb | April 18, 2007 at 07:31 PM
Eshell V5.5.4 (abort with ^G)
1> c(ranks)
1> .
{ok,ranks}
2> ranks:fetch_in_parallel().
=ERROR REPORT==== 18-Apr-2007::18:32:58 ===
Error in process with exit value: {{badmatch,[]},[{ranks,fetch_title_and_rank,1},{ranks,'-background_fetch/1-fun-0-',2}]}
Dave says: Did you add a valid Amazon key?
Posted by: anon | April 18, 2007 at 08:33 PM
Just for the fun of it, try the debugger:
(if you've got the right gui libs. installed)
1> c(ranks, [debug_info]).
2> im(). % a window should pop up, ignore it for now...
3> ii(ranks).
4> iaa([init]).
5> ranks:fetch_in_parallel().
After the last command, you should get yet another window.
From here, single step, set break-points, study varable contents, etc.
Note: the first windows shows the processes running the actual code,
double-click on a line to attach yourself to that particual process
and you'll get the second window.
Cheers, Tobbe
Posted by: Tobbe | April 19, 2007 at 06:06 AM
Till now I never realized how easy and straightforward is to write erlang code. I followed examples and had paralel web fetcher done in few minutes.
Since I have similar app written in perl (using POE), I added some parsing and tryed it out. I'm running this on my laptop, and perl script is running on quad proc server box. It significantly outperforms the perl version.
The only problem is, my boss now know about this speedup and I can just see his brain ticking... :-)
Very nice post, thanks. I learned a lot.
Posted by: Damir | April 20, 2007 at 04:44 AM
If I run the timer:tc() repeatedly, the times change drastically. Sometimes it is 3 times more than the series function. any ideas why?
> timer:tc(ranks, fetch_in_parallel, []).
run1: 3757482
run2: 1162909
run3: 6826861
> timer:tc(ranks, fetch_in_series, []).
run1: 3164756
run2: 5552845
run3: 2835604
Posted by: Mike | April 22, 2007 at 12:53 PM
Dave, you don't need to abstract the PID = self() prior to the spawn.
You're correct that if writing it as:
spawn(fun double(self(),Number end)
then you'd have to do this, but you can also do it by launching the function with a name:
spawn(ranks,double,[self(),Number])
Not only is that better, but it makes the code execute ranks:double() rather than double(), which means that if you wanted to, you could replace the definition of ranks:double() with a new version and it would pick up the new one. In your example, you're baking in the version of the code when the spawn argument is done.
Doesn't make much of a difference here, but if it was being called in a server loop then it would make a difference. But I can see the syntax being fiddly if you wanted to pass in lots of args and thought you had to bind them to names first.
Posted by: Alex Blewitt | May 05, 2007 at 07:09 PM
Since the rank code goes out to Amazon for the ranking values, the response time is based on round trip.
Posted by: Richard | May 07, 2007 at 04:28 PM
You can even let someone write the concurrency primitives like pmap for you!
http://code.google.com/p/plists/
is an erlang library of parallel list functions:
...plists is a drop-in replacement for the Erlang module lists, making most list operations parallel. It can operate on each element in parallel, for IO-bound operations, on sublists in parallel, for taking advantage of multi-core machines with CPU-bound operations, and across erlang nodes, for parallizing inside a cluster. It handles errors and node failures. It can be configured, tuned, and tweaked to get optimal performance while minimizing overhead.
...
This module also include a simple mapreduce implementation, and the function runmany. All the other functions are implemented with runmany, which is as a generalization of parallel list operations.
Posted by: Mark Aufflick | December 20, 2007 at 06:09 PM
say, if you want to make this a client server how do you go about it .
This is what I tried but couldnt get it running
In the ranks.erl I do away with background_fetch/1 and do this :
set_queries_running(ISBNS) ->
lists:foreach(fun(ISBN) -> parentPID ! {ok,fetch_title_and_rank(ISBN)} end, ISBNS).
I then create a separate client.erl which has this :
-module(client).
-(export[gather_results/1]).
gather_results() ->
register(parentPID,spawn(fun() -> loop() end)),
ranks_del:fetch_in_parallel().
loop() ->
receive
{ ok,Anything } -> Anything
end.
This doesnt work . What am I doing wrong ?
Posted by: himadri | April 24, 2009 at 04:47 AM