#!/usr/local/bin/newlisp
; mapreduce - requires newLISP v.10.x - L.M. March 2007
;
; v.1.22 tested w/o changes on 10.3.3 (updated doc and utput)
; v.1.23 tested on 10.7.4 (updated newlisp executable path)
;
; MapReduce - example for distributed computing using a map-reduce style
; algorithm implemented in newLISP using the function 'bayes-train'
; for word counting and 'net-eval' for mapping the word counting task
; onto different nodes in a cluster. 'bayes-train' is normally used in
; conjunction with 'bayes-query' for calculating Bayesian probabilities
; of token sequences to occur in multiple data sets, but it can also be
; used as a fast token counting function for individual data sets.
;
; See also from Google Labs:
; http://labs.google.com/papers/mapreduce.html
; http://216.239.37.132/papers/mapreduce-osdi04.pdf
;
; In map-reduce terminology the documents analyzed in the program below
; correspond to the 'keys' and the dictionary extracted for each document
; to the 'value'. The reduce step consists on reducing the multiple word
; dictionaries to one dictionary with consolidated word counts.
;
; The result of the calulations will be in a file Totals.lsp in the
; working directory where this program was started. A directory called:
; "mrdemo" should be present in the working directory of the server
; node(s). This directory will contain node word counts for each
; node. The reducing master node will load the dictionaries from here
; via HTTP.
;
; For a demo all node processes and the master/reducer process can
; be run on the same CPU. This still cuts the total time in half
; compared to sequential processing, because of better CPU usage in
; a multi processing operating system.
;
; See also http://newlisp.org/CodePatterns.html#distributed for more
; details on configuring server nodes.
; The documents to aquire
; make program compatible with older versions of newLISP
(when (< (sys-info -2) 10111)
(constant (global 'term) name))
;(define remote true) ; retrieve doc from http://www.gutenberg.org/
(define remote false) ; retrieve docs from a local harddisk
(if remote
(set 'docs '(
; A Comedy of Masks - Ernest Dowson and Arthur Moore, 547KB
"http://www.gutenberg.org/files/16703/16703.txt"
; The Adventures of Sherlock Holmes - Conan Doyle, 576KB
"http://www.gutenberg.org/cache/epub/1661/pg1661.txt"
; The tale of Beowulf - anonymous, 219KB
"http://www.gutenberg.org/files/20431/20431-8.txt"
))
; when running local copies of the text files, then place them
; in a subdirectory 'mrdemo' of the current working directory
(set 'docs '(
"mrdemo/Comedy.txt"
"mrdemo/Sherlock.txt"
"mrdemo/Beowulf.txt"
))
)
; The three nodes may run on different CPUs or all on the same CPU,
; either in 'newlisp -c -w <workdir>' mode running as an inetd or xinetd
; service or in 'newlisp -c -d <port> -w <workdir>' mode, where <port>
; is a port number and <workdir> is the directory containing 'mrdemo'.
; When running inetd or xinetd nodes on the same CPU, the port numbers
; can be the same. For each request a new process will be started by
; (x)inetd. When running on different computers each node would have
; a different IP address or hostname.
;
; the following configuration is for 3 xinetd/inetd nodes on the same
; CPU as the master reducer node.
;(set 'nodes '(
; ("localhost" 4711)
; ("localhost" 4712)
; ("localhost" 4713)
;))
; This configuration would be 'newlisp -c -d <port> -w <workdir>' type
; proccesses started either on the same or different CPUs. Here shown
; for the same CPU. For remote machines specify different host names
; or IP numbers, the port numbers then can be identical. A directory
; 'mrdemo' should be created in <workdir>. The '-w <workdir>' spec can
; be omitted when placing 'mrdemo' in the the directory where the nodes
; are started, i.e.:
; newlisp -c -d 10001 -w /Users/newlisp
;
(set 'pid1 (process "/usr/local/bin/newlisp -c -d 10001 -w /Users/lutz/Sites"))
(set 'pid2 (process "/usr/local/bin/newlisp -c -d 10002 -w /Users/lutz/Sites"))
(set 'pid3 (process "/usr/local/bin/newlisp -c -d 10003 -w /Users/lutz/Sites"))
(sleep 1000)
(println "started servers -> " pid1 " " pid2 " " pid3)
(set 'nodes '(
("localhost" 10001)
("localhost" 10002)
("localhost" 10003)
))
; The worker node program could be loaded by a node during startup, but
; the program with the request. In most caseas the newLISP startup
; and program load time is very small compared to the node processing
; time.
(set 'task [text]
(local (url id start text)
(setq WORDS:total '(0))
(set 'url "%s")
(set 'id %d)
(set 'start (time-of-day))
(set 'text (lower-case (read-file url)))
(set 'text (parse text "[^a-z]+" 0))
(bayes-train text 'WORDS)
(save (string "mrdemo/words-" id ".lsp") 'WORDS)
; return a list of node id, doc, words proccessed and time spent
(list (list 'node id)
(list 'doc (last (parse url "/")))
(list 'words (WORDS:total 0))
(list 'ms (- (time-of-day) start))
)
)
[/text])
; the idle procedure is called when a node has finished processing
(define (idle-proc param)
(if param (println param)))
; map the counting task onto the nodes using 'net-eval'.
; wait a maximum of 30 seconds for all nodes to finish.
(net-eval (list
(list (nodes 0 0) (nodes 0 1) (format task (docs 0) 0) )
(list (nodes 1 0) (nodes 1 1) (format task (docs 1) 1) )
(list (nodes 2 0) (nodes 2 1) (format task (docs 2) 2) )
) 30000 idle-proc)
(println)
; reduce procedure to consolidate all node name spaces in to one
; Totals name space
(define (reduce space totals)
(dotree (w space)
(let (prior (sym (term w) totals))
(if (not (eval prior))
(set prior (eval w))
(setf ((eval prior) 0) (+ $it (first (eval w))))
)
)
)
)
(dotimes (n (length nodes))
(set 'url (string "http://" (nodes n 0) ":" (nodes n 1) "/mrdemo/words-" n ".lsp"))
(println "reduce from " url)
(load url)
(reduce WORDS 'Totals)
; zero out for next node aquired
(dotree (w WORDS) (set w '(0)))
)
(println "Saving Totals")
(save "Totals.lsp" 'Totals)
(println "destroying processes ...")
(destroy pid1)
(destroy pid2)
(destroy pid3)
(exit)
; On a MacOS X 10.6 MacMini Intel 1.834 Ghz Core 2 Duo he following
; output is generated when using local doc files (slightly different
; from the remote versions).
;
; ~> ./mapreduce
; started servers -> 28402 28403 28404
; ("localhost" 10003 ((node 2) (doc "Beowulf.txt") (words 38764) (ms 174)))
; ("localhost" 10001 ((node 0) (doc "Comedy.txt") (words 98107) (ms 321)))
; ("localhost" 10002 ((node 1) (doc "Sherlock.txt") (words 108215) (ms 326)))
;
; reduce from http://localhost:10001/mrdemo/words-0.lsp
; reduce from http://localhost:10002/mrdemo/words-1.lsp
; reduce from http://localhost:10003/mrdemo/words-2.lsp
; Saving Totals
; destroying processes ...
; ~>
; When nodes retrieve the docouments over the Internet from
; www.gutenberg.org the whoole process takes a longer time:
;
; ~> ./mapreduce
; started servers -> 28379 28380 28381
; ("localhost" 10001 ((node 0) (doc "16703.txt") (words 98107) (ms 6829)))
; ("localhost" 10003 ((node 2) (doc "20431-8.txt") (words 38754) (ms 8358)))
; ("localhost" 10002 ((node 1) (doc "pg1661.txt") (words 109002) (ms 11084)))
;
; reduce from http://localhost:10001/mrdemo/words-0.lsp
; reduce from http://localhost:10002/mrdemo/words-1.lsp
; reduce from http://localhost:10003/mrdemo/words-2.lsp
; Saving Totals
; destroying processes ...
; ~>
; eof ;
syntax highlighting with newLISP and syntax.cgi