source: trunk/p2p/worker.tcl @ 1251

Last change on this file since 1251 was 1251, checked in by mmc, 15 years ago

Added first cut of P2P network for job management. See README in this
directory for details.

File size: 13.7 KB
Line 
1# ----------------------------------------------------------------------
2#  P2P: worker node in P2P mesh bidding on and executing jobs
3#
4#  This server is a typical worker node in the P2P mesh for HUBzero
5#  job execution.  It talks to other workers to distribute the load
6#  of job requests.  It bids on jobs based on its current resources
7#  and executes jobs to earn points from the central authority.
8# ----------------------------------------------------------------------
9#  Michael McLennan (mmclennan@purdue.edu)
10# ======================================================================
11#  Copyright (c) 2008  Purdue Research Foundation
12# ======================================================================
13
14# recognize other library files in this same directory
15set dir [file dirname [info script]]
16lappend auto_path $dir
17
18# handle log file for this worker
19log channel debug on
20log channel system on
21
22# set up a server at the first open port above 9101
23set server [Server ::#auto 9101? -servername worker \
24    -onprotocol worker_peers_protocol]
25
26# ======================================================================
27#  OPTIONS
28#  These options get the worker started, but new option settings
29#  some from the authority after this worker first connects.
30# ======================================================================
31set authority ""     ;# file handle for authority connection
32set myaddress "?:?"  ;# address/port for this worker
33
34# set of connections for authority servers
35set options(authority_hosts) 127.0.0.1:9001
36
37# register with the central authority at this frequency
38set options(time_between_registers) 10000
39
40# this worker should try to connect with this many other peers
41set options(max_peer_connections) 4
42
43# ======================================================================
44#  PEERS
45# ======================================================================
46# eventually set to the list of all peers in the network
47set peers(all) ""
48
49# list of peers that we're testing connections with
50set peers(testing) ""
51
52# ======================================================================
53#  PROTOCOL: hubzero:peer/1
54#  This protocol gets used when another worker connects to this one.
55# ======================================================================
56$server protocol hubzero:peer/1
57
58$server define hubzero:peer/1 exception {message} {
59    log system "ERROR: $message"
60}
61
62# ----------------------------------------------------------------------
63#  DIRECTIVE: identity
64#  Used for debugging, so that each client can identify itself by
65#  name to this worker.
66# ----------------------------------------------------------------------
67$server define hubzero:peer/1 identity {name} {
68    variable cid
69    variable handler
70    $handler connectionName $cid $name
71    return ""
72}
73
74# ----------------------------------------------------------------------
75#  DIRECTIVE: ping
76# ----------------------------------------------------------------------
77$server define hubzero:peer/1 ping {} {
78    return "pong"
79}
80
81# ======================================================================
82#  PROTOCOL: hubzero:worker/1
83#  The authority_connect procedure connects this worker to an authority
84#  and begins the authority/worker protocol.  The directives below
85#  handle the incoming (authority->worker) traffic.
86# ======================================================================
87proc authority_connect {} {
88    global authority options
89    log system "connecting to authorities..."
90
91    authority_disconnect
92
93    # scan through list of authorities and try to connect
94    foreach addr [randomize $options(authority_hosts)] {
95        if {[catch {Client ::#auto $addr} result] == 0} {
96            set authority $result
97            break
98        }
99    }
100
101    if {"" == $authority} {
102        error "can't connect to any known authorities"
103    }
104
105    # add handlers to process the incoming commands...
106    $authority protocol hubzero:worker/1
107    $authority define hubzero:worker/1 exception {args} {
108        log system "error from authority: $args"
109    }
110
111    # ------------------------------------------------------------------
112    #  DIRECTIVE: options <key1> <value1> <key2> <value2> ...
113    #  These option settings coming from the authority override the
114    #  option settings built into the client at the top of this
115    #  script.  The authority probably has a more up-to-date list
116    #  of other authorities and better policies for living within
117    #  the network.
118    # ------------------------------------------------------------------
119    $authority define hubzero:worker/1 options {args} {
120        global options server myaddress
121
122        array set options $args
123
124        if {[info exists options(ip)]} {
125            set myaddress $options(ip):[$server port]
126            log debug "my address $myaddress"
127        }
128        return ""
129    }
130
131    # ------------------------------------------------------------------
132    #  DIRECTIVE: peers <listOfAddresses>
133    #  This message comes in after this worker has sent the "peers"
134    #  message to request the current list of peers.  The
135    #  <listOfAddresses> is a list of host:port addresses that this
136    #  worker should contact to enter the p2p network.
137    # ------------------------------------------------------------------
138    $authority define hubzero:worker/1 peers {plist} {
139        global peers
140        set peers(all) $plist
141        after idle worker_peers_update
142
143        # now that we've gotten the peers, we're done with the authority
144        authority_disconnect
145    }
146
147    # ------------------------------------------------------------------
148    #  DIRECTIVE: identity
149    #  Used for debugging, so that each client can identify itself by
150    #  name to this worker.
151    # ------------------------------------------------------------------
152    $authority define hubzero:worker/1 identity {name} {
153        variable cid
154        variable handler
155        $handler connectionName $cid $name
156        return ""
157    }
158
159    $authority send "protocol hubzero:worker/1"
160    return $authority
161}
162
163proc authority_disconnect {} {
164    global authority
165    if {"" != $authority} {
166        catch {itcl::delete object $authority}
167        set authority ""
168    }
169}
170
171# ======================================================================
172#  USEFUL ROUTINES
173# ======================================================================
174# ----------------------------------------------------------------------
175#  COMMAND:  worker_register
176#
177#  Invoked when this worker first starts up and periodically thereafter
178#  to register the worker with the central authority, and to request
179#  a list of peers that this peer can talk to.
180# ----------------------------------------------------------------------
181proc worker_register {} {
182    global options server
183
184    # connect to the authority and request a list of peers
185    if {[catch {
186        set client [authority_connect]
187        $client send "listening [$server port]"
188        $client send "peers"
189    } result]} {
190        log system "ERROR: $result"
191    }
192
193    # register again at regular intervals in case the authority
194    # gets restarted in between.
195    after $options(time_between_registers) worker_register
196}
197
198# ----------------------------------------------------------------------
199#  COMMAND:  worker_peers_update
200#
201#  Invoked when this worker has received a list of peers, and from
202#  time to time thereafter, to establish connections with other peers
203#  in the network.  This worker picks a number of peers randomly from
204#  the list of all available peers, and then pings each of them.
205#  Timing results from the pings are stored away, and when all pings
206#  have completed, this worker picks the best few connections and
207#  keeps those.
208# ----------------------------------------------------------------------
209proc worker_peers_update {} {
210    global options peers myaddress
211    worker_peers_cleanup
212
213    #
214    # Pick a random group of peers and try to connect to them.
215    # Start with the existing peers, and then add a random
216    # bunch of others.
217    #
218    foreach key [array names peers current-*] {
219        set peer $peers($key)
220        lappend peers(testing) $peer
221    }
222
223    #
224    # Pick other peers at random.  We don't have to try all
225    # peers--just some number that is much larger than the
226    # final number we want to talk to.
227    #
228    set ntest [expr {10 * $options(max_peer_connections)}]
229    foreach addr [randomize $peers(all) $ntest] {
230        if {$addr == $myaddress} {
231            continue
232        }
233        if {![info exists peers(current-$addr)]
234              && [catch {Client ::#auto $addr} peer] == 0} {
235            # open a new connection to this address
236            lappend peers(testing) $peer
237            $peer protocol hubzero:peer/1
238
239            $peer define hubzero:peer/1 exception {message} {
240                log system "ERROR: $message"
241            }
242
243            $peer define hubzero:peer/1 identity {name} {
244                variable cid
245                variable handler
246                $handler connectionName $cid $name
247                return ""
248            }
249
250            # when we get a "pong" back, store the latency
251            $peer define hubzero:peer/1 pong {} {
252                global peers
253                variable handler
254puts "  pong from $handler"
255                set now [clock clicks -milliseconds]
256                set delay [expr {$now - $peers(ping-$handler-start)}]
257                set peers(ping-$handler-latency) $delay
258                incr peers(responses)
259                worker_peers_finalize
260                return ""
261            }
262
263            # start the ping/pong session with the peer
264            $peer send "protocol hubzero:peer/1"
265
266            # send tell this peer our name (for debugging)
267            if {[log channel debug]} {
268                $peer send "identity $myaddress"
269            }
270        }
271    }
272
273    #
274    # Now, loop through all peers and send a "ping" message.
275    #
276    foreach peer $peers(testing) {
277puts "pinging $peer = [$peer address]..."
278        # mark the time and send a ping to this peer
279        set peers(ping-$peer-start) [clock clicks -milliseconds]
280        $peer send "ping"
281    }
282
283    # if this test takes too long, just give up
284    after 10000 worker_peers_finalize -force
285}
286
287# ----------------------------------------------------------------------
288#  COMMAND:  worker_peers_finalize ?-force?
289#
290#  Called after worker_peers_update has finished its business to
291#  clean up all of the connections that were open for testing.
292# ----------------------------------------------------------------------
293proc worker_peers_finalize {{option -check}} {
294    global peers options
295    if {$option == "-force" || $peers(responses) == [llength $peers(testing)]} {
296        # build a list:  {peer latency} {peer latency} ...
297        set plist ""
298        foreach obj $peers(testing) {
299            if {[info exists peers(ping-$obj-latency)]} {
300                lappend plist [list $obj $peers(ping-$obj-latency)]
301            }
302        }
303puts "-------------------\nFINALIZING $::myaddress"
304puts "PINGS: $plist"
305
306        # sort the list and extract the top peers
307        set plist [lsort -increasing -index 1 $plist]
308        set plist [lrange $plist 0 [expr {$options(max_peer_connections)-1}]]
309puts "TOP: $plist"
310
311        set current [array names peers current-*]
312puts "  current: $current"
313
314        # transfer the top peers to the "current" list
315        set all ""
316        foreach rec $plist {
317            set peer [lindex $rec 0]
318            set addr [$peer address]
319            set peers(current-$addr) $peer
320            lappend all $addr
321
322            # if it is already on the "current" list, then keep it
323            set i [lsearch $current current-$addr]
324            if {$i >= 0} {
325                set current [lreplace $current $i $i]
326            }
327            set i [lsearch $peers(testing) $peer]
328            if {$i >= 0} {
329                set peers(testing) [lreplace $peers(testing) $i $i]
330            }
331        }
332        log system "connected to peers: $all"
333puts "  new current: [array names peers current-*]"
334puts "  final: $all"
335puts "  leftover: $current $peers(testing)"
336
337        # get rid of old peers that we no longer want to talk to
338        foreach leftover $current {
339            itcl::delete object $peers($leftover)
340            unset peers($leftover)
341puts "  cleaned up $leftover (was on current list)"
342        }
343
344        # clean up after this test
345        worker_peers_cleanup
346    }
347}
348
349# ----------------------------------------------------------------------
350#  COMMAND:  worker_peers_cleanup
351#
352#  Called after worker_peers_update has finished its business to
353#  clean up all of the connections that were open for testing.
354# ----------------------------------------------------------------------
355proc worker_peers_cleanup {} {
356    global peers
357    foreach obj $peers(testing) {
358        catch {itcl::delete object $obj}
359puts "  cleaned up $obj (was on testing list)"
360    }
361    set peers(testing) ""
362
363    foreach key [array names peers ping-*] {
364        unset peers($key)
365    }
366    set peers(responses) 0
367
368    after cancel worker_peers_finalize -force
369}
370
371# ----------------------------------------------------------------------
372#  COMMAND:  worker_peers_protocol
373#
374#  Invoked whenever a peer sends their protocol message to this
375#  worker.  Sends the same protocol name back, so the other worker
376#  understands what protocol we're speaking.
377# ----------------------------------------------------------------------
378proc worker_peers_protocol {cid protocol} {
379    if {"DEFAULT" != $protocol} {
380        puts $cid [list protocol $protocol]
381    }
382}
383
384# ======================================================================
385#  Connect to one of the authorities to get a list of peers.  Then,
386#  sit in the event loop and process events.
387# ======================================================================
388after idle worker_register
389log system "starting..."
390vwait main-loop
Note: See TracBrowser for help on using the repository browser.