source: trunk/p2p/worker.tcl @ 2080

Last change on this file since 2080 was 2080, checked in by mmc, 14 years ago

Part 1 of a major reorganization of content. Moving "instant" to "builder"
and setting up "builder" more like the "gui" part as a package. Moving the
Rappture::object stuff from the builder into the main installation, so it
can be shared by the tester as well. Moving "driver" into gui/scripts
where it belongs. Creating a new "launcher.tcl" script that decides
which of the three parts to launch based on command line options. Still
need to sort out the Makefiles to get this all right...

File size: 24.8 KB
Line 
1# ----------------------------------------------------------------------
2#  P2P: worker node in P2P mesh bidding on and executing jobs
3#
4#  This server is a typical worker node in the P2P mesh for HUBzero
5#  job execution.  It talks to other workers to distribute the load
6#  of job requests.  It bids on jobs based on its current resources
7#  and executes jobs to earn points from the central authority.
8# ----------------------------------------------------------------------
9#  Michael McLennan (mmclennan@purdue.edu)
10# ======================================================================
11#  Copyright (c) 2008  Purdue Research Foundation
12#
13#  See the file "license.terms" for information on usage and
14#  redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES.
15# ======================================================================
16package require Rappture
17
18# recognize other library files in this same directory
19set dir [file dirname [info script]]
20lappend auto_path $dir
21
22# handle log file for this worker
23log channel error on
24log channel system on
25log channel debug on
26
27proc ::bgerror {err} { log error "ERROR: $err $::errorInfo" }
28
29
30set myaddress "?:?"  ;# address/port for this worker
31
32# set of connections for authority servers
33p2p::options register authority_hosts 127.0.0.1:9001
34
35# register with the central authority at this frequency
36p2p::options register time_between_authority_checks 60000
37
38# rebuild the peer-to-peer network at this frequency
39p2p::options register time_between_network_rebuilds 600000
40
41# this worker should try to connect with this many other peers
42p2p::options register max_peer_connections 4
43
44# workers propagate messages until time-to-live reaches 0
45p2p::options register peer_time_to_live 4
46
47# ======================================================================
48#  PROTOCOL: hubzero:worker<-authority/1
49#
50#  The worker initiates communication with the authority, and the
51#  authority responds by sending these messages.
52# ======================================================================
53p2p::protocol::register hubzero:worker<-authority/1 {
54
55    # ------------------------------------------------------------------
56    #  INCOMING: options <key1> <value1> <key2> <value2> ...
57    #  These option settings coming from the authority override the
58    #  option settings built into the client.  The authority probably
59    #  has a more up-to-date list of other authorities and better
60    #  policies for living within the network.
61    # ------------------------------------------------------------------
62    define options {args} {
63        global server myaddress
64
65        array set options $args
66        foreach key [array names options] {
67            catch {p2p::options set $key $options($key)}
68        }
69
70        if {[info exists options(ip)]} {
71            set myaddress $options(ip):[$server port]
72            log debug "my address $myaddress"
73        }
74        return ""
75    }
76
77    # ------------------------------------------------------------------
78    #  INCOMING: peers <listOfAddresses>
79    #  This message comes in after this worker has sent the "peers"
80    #  message to request the current list of peers.  The
81    #  <listOfAddresses> is a list of host:port addresses that this
82    #  worker should contact to enter the p2p network.
83    # ------------------------------------------------------------------
84    define peers {plist} {
85        global peers
86        set peers(all) $plist
87        after idle {peer-network goto measure}
88
89        # now that we've gotten the peers, we're done with the authority
90        authority-connection goto idle
91        return ""
92    }
93
94    # ------------------------------------------------------------------
95    #  INCOMING: identity
96    #  Used for debugging, so that each client can identify itself by
97    #  name to this worker.
98    # ------------------------------------------------------------------
99    define identity {name} {
100        variable cid
101        variable handler
102        $handler connectionName $cid $name
103        return ""
104    }
105}
106
107# ======================================================================
108#  PROTOCOL: hubzero:worker<-foreman/1
109#
110#  The foreman initiates communication with a worker, and sends the
111#  various messages supported below.
112# ======================================================================
113p2p::protocol::register hubzero:worker<-foreman/1 {
114
115    # ------------------------------------------------------------------
116    #  INCOMING: solicit
117    #  Foremen send this message to solicit bids for a simulation job.
118    # ------------------------------------------------------------------
119    define solicit {args} {
120        variable cid
121        log debug "solicitation request from foreman: $args"
122        eval Solicitation ::#auto $args -connection $cid
123        return ""
124    }
125}
126
127# ======================================================================
128#  PROTOCOL: hubzero:workers<-workerc/1
129#
130#  Workers initiate connections with other workers as peers.  The
131#  following messages are sent by worker clients to the worker server.
132# ======================================================================
133p2p::protocol::register hubzero:workers<-workerc/1 {
134    # ------------------------------------------------------------------
135    #  INCOMING: identity
136    #  Used for debugging, so that each client can identify itself by
137    #  name to this worker.
138    # ------------------------------------------------------------------
139    define identity {name} {
140        variable cid
141        variable handler
142        $handler connectionName $cid $name
143        return ""
144    }
145
146    # ------------------------------------------------------------------
147    #  INCOMING: ping
148    #  If another worker sends "ping", they are trying to measure the
149    #  speed of the connection.  Send back a "pong" message.  If this
150    #  worker is close by, the other worker will stay connected to it.
151    # ------------------------------------------------------------------
152    define ping {} {
153        return "pong"
154    }
155
156    # ------------------------------------------------------------------
157    #  INCOMING: solicit -job info -path hosts -token xxx
158    #  Workers send this message on to their peers to solicit bids
159    #  for a simulation job.
160    # ------------------------------------------------------------------
161    define solicit {args} {
162        variable cid
163        log debug "solicitation request from peer: $args"
164        eval Solicitation ::#auto $args -connection $cid
165        return ""
166    }
167
168    # ------------------------------------------------------------------
169    #  INCOMING: proffer <token> <details>
170    #  Workers send this message back after a "solicit" request with
171    #  details about what they can offer in terms of CPU power.  When
172    #  a worker has received all replies from its peers, it sends back
173    #  its own proffer message to the client that started the
174    #  solicitation.
175    # ------------------------------------------------------------------
176    define proffer {token details} {
177        Solicitation::proffer $token $details
178        return ""
179    }
180}
181
182# ======================================================================
183#  PROTOCOL: hubzero:workerc<-workers/1
184#
185#  The following messages are received by a worker client in response
186#  to the requests that they send to a worker server.
187# ======================================================================
188p2p::protocol::register hubzero:workerc<-workers/1 {
189    # ------------------------------------------------------------------
190    #  INCOMING: identity
191    #  Used for debugging, so that each client can identify itself by
192    #  name to this worker.
193    # ------------------------------------------------------------------
194    define identity {name} {
195        variable cid
196        variable handler
197        $handler connectionName $cid $name
198        return ""
199    }
200
201    # ------------------------------------------------------------------
202    #  INCOMING: pong
203    #  When forming the peer-to-peer network, workers send ping/pong
204    #  messages to one another to measure the latency.
205    # ------------------------------------------------------------------
206    define pong {} {
207        global peers
208        variable handler
209        set now [clock clicks -milliseconds]
210        set delay [expr {$now - $peers(ping-$handler-start)}]
211        set peers(ping-$handler-latency) $delay
212        if {[incr peers(responses)] >= [llength $peers(testing)]} {
213            after cancel {peer-network goto finalize}
214            after idle {peer-network goto finalize}
215        }
216        return ""
217    }
218
219    # ------------------------------------------------------------------
220    #  INCOMING: solicit -job info -path hosts -token xxx
221    #  Workers send this message on to their peers to solicit bids
222    #  for a simulation job.
223    # ------------------------------------------------------------------
224    define solicit {args} {
225        variable cid
226        log debug "solicitation request from peer: $args"
227        eval Solicitation ::#auto $args -connection $cid
228        return ""
229    }
230
231    # ------------------------------------------------------------------
232    #  INCOMING: proffer <token> <details>
233    #  Workers send this message back after a "solicit" request with
234    #  details about what they can offer in terms of CPU power.  When
235    #  a worker has received all replies from its peers, it sends back
236    #  its own proffer message to the client that started the
237    #  solicitation.
238    # ------------------------------------------------------------------
239    define proffer {token details} {
240        Solicitation::proffer $token $details
241        return ""
242    }
243}
244
245# ----------------------------------------------------------------------
246#  COMMAND:  broadcast_to_peers <message> ?<avoidList>?
247#
248#  Used to broadcast a message out to all peers.  The <message> must
249#  be one of the commands defined in the worker protocol.  If there
250#  are no peer connections yet, this command does nothing.  If any
251#  peer appears on the <avoidList>, it is skipped.  Returns the
252#  number of messages sent out, so this peer knows how many replies
253#  to wait for.
254# ----------------------------------------------------------------------
255proc broadcast_to_peers {message {avoidList ""}} {
256    global server peers
257
258    #
259    # Build a list of all peer addresses, so we know who we're
260    # going to send this message out to.
261    #
262    set recipients ""
263    foreach key [array names peers current-*] {
264        set addr [$peers($key) address]
265        if {[lsearch $avoidList $addr] < 0} {
266            lappend recipients $addr
267        }
268    }
269    foreach cid [$server connections hubzero:workers<-workerc/1] {
270        set addr [lindex [$server connectionName $cid] 0]  ;# x.x.x.x (sockN)
271        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
272            lappend recipients $addr
273        }
274    }
275
276    #
277    # If the message has any @RECIPIENTS fields, replace them
278    # with the list of recipients
279    #
280    regsub -all @RECIPIENTS $message $recipients message
281
282    #
283    # Send the message out to all peers.  Keep a count and double-check
284    # it against the list of recipients generated above.
285    #
286    set nmesgs 0
287
288    # send off to other workers that this one has connected to
289    foreach key [array names peers current-*] {
290        set addr [$peers($key) address]
291        if {[lsearch $avoidList $addr] < 0} {
292            if {[catch {$peers($key) send $message} err] == 0} {
293                incr nmesgs
294            } else {
295                log error "ERROR: broadcast failed to [$peers($key) address]: $result\n  (message was \"$message\")"
296            }
297        }
298    }
299
300    # send off to other workers that connected to this one
301    foreach cid [$server connections hubzero:workers<-workerc/1] {
302        set addr [lindex [$server connectionName $cid] 0]  ;# x.x.x.x (sockN)
303        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
304            if {[catch {puts $cid $message} result] == 0} {
305                incr nmesgs
306            } else {
307                log error "ERROR: broadcast failed for $cid: $result"
308                log error "  (message was $message)"
309            }
310        }
311    }
312
313    # did we send the right number of messages?
314    if {[llength $recipients] != $nmesgs} {
315        log error "ERROR: sent only $nmesgs messages to peers {$recipients}"
316    }
317
318    return $nmesgs
319}
320
321# ----------------------------------------------------------------------
322#  COMMAND:  worker_got_protocol
323#
324#  Invoked whenever a peer sends their protocol message to this
325#  worker.  Sends the same protocol name back, so the other worker
326#  understands what protocol we're speaking.
327# ----------------------------------------------------------------------
328proc worker_got_protocol {cid protocol} {
329    switch -glob -- $protocol {
330        *<-worker* {
331            puts $cid [list protocol hubzero:workerc<-workers/1]
332        }
333        *<-foreman* {
334            puts $cid [list protocol hubzero:foreman<-worker/1]
335        }
336        DEF* {
337            # do nothing
338        }
339        default {
340            error "don't recognize protocol \"$protocol\""
341        }
342    }
343}
344
345# ----------------------------------------------------------------------
346#  COMMAND:  worker_got_dropped <address>
347#
348#  Invoked whenever an inbound connection to this worker drops.
349#  At some point we should try updating our connections to other
350#  peers, to replace this missing connection.
351# ----------------------------------------------------------------------
352proc worker_got_dropped {addr} {
353    global peers
354    if {[info exists peers(current-$addr)]} {
355        unset peers(current-$addr)
356        after cancel {peer-network goto measure}
357        after 5000 {peer-network goto measure}
358log debug "peer dropped: $addr"
359    }
360}
361
362# ======================================================================
363#  Connect to one of the authorities to get a list of peers.  Then,
364#  sit in the event loop and process events.
365# ======================================================================
366p2p::wonks::init
367
368set server [p2p::server -port 9101? \
369        -protocols {hubzero:workers<-workerc hubzero:worker<-foreman} \
370        -servername worker \
371        -onprotocol worker_got_protocol \
372        -ondisconnect worker_got_dropped]
373
374# ----------------------------------------------------------------------
375#  AUTHORITY CONNECTION
376#
377#  This is the state machine representing the connection to the
378#  authority server.  We start in the "idle" state.  Whenever we
379#  try to move to "connected", we'll open a connection to an authority
380#  and request updated information on workers.  If that connection
381#  fails, we'll get kicked back to "idle" and we'll try again later.
382# ----------------------------------------------------------------------
383StateMachine authority-connection
384authority-connection statedata cnx ""
385
386# sit here whenever we don't have a connection
387# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
388authority-connection state idle -onenter {
389    # have an open connection? then close it
390    if {"" != [statedata cnx]} {
391        catch {itcl::delete object [statedata cnx]}
392        statedata cnx ""
393    }
394    # try to connect again later
395    set delay [p2p::options get time_between_authority_checks]
396    after $delay {authority-connection goto connected}
397} -onleave {
398    after cancel {authority-connection goto connected}
399}
400
401# sit here after we're connected and waiting for a response
402# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
403authority-connection state connected
404
405# when moving to the connected state, make a connection
406# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
407authority-connection transition idle->connected -onchange {
408    global server
409
410    # connect to the authority and request a list of peers
411    statedata cnx ""
412    foreach addr [randomize [p2p::options get authority_hosts]] {
413        if {[catch {p2p::client -address $addr \
414            -sendprotocol hubzero:authority<-worker/1 \
415            -receiveprotocol hubzero:worker<-authority/1} result] == 0} {
416            statedata cnx $result
417            break
418        }
419    }
420
421    if {"" != [statedata cnx]} {
422        [statedata cnx] send "listening [$server port]"
423        [statedata cnx] send "peers"
424    } else {
425        error "can't connect to any authority\nAUTHORITY LIST: [p2p::options get authority_hosts]"
426    }
427}
428
429# ----------------------------------------------------------------------
430#  PEER-TO-PEER CONNECTIONS
431#
432#  This is the state machine representing the network of worker peers.
433#  We start in the "idle" state.  From time to time we move to the
434#  "measure" state and attempt to establish connections with a set
435#  peers.  We then wait for ping/pong responses and move to "finalize".
436#  When we have all responses, we move to "finalize" and finalize all
437#  connections, and then move back to "idle".
438# ----------------------------------------------------------------------
439StateMachine peer-network
440peer-network statedata cnx ""
441
442# sit here when the peer network is okay
443# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
444peer-network state idle -onenter {
445    # try to connect again later
446    set delay [p2p::options get time_between_network_rebuilds]
447    after $delay {peer-network goto measure}
448} -onleave {
449    after cancel {peer-network goto measure}
450}
451
452# sit here when we need to rebuild the peer-to-peer network
453# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
454peer-network state measure
455
456# when moving to the start state, make connections to a bunch of peers
457# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
458peer-network transition idle->measure -onchange {
459    global server myaddress peers
460
461    #
462    # Get a list of workers already connected to this one
463    # with an inbound connection to the server.
464    #
465    foreach cid [$server connections hubzero:workers<-workerc/1] {
466        set addr [lindex [$server connectionName $cid] 0]
467        set inbound($addr) $cid
468    }
469
470    #
471    # Pick a random group of peers and try to connect to them.
472    # Start with the existing peers to see if their connection
473    # is still favorable, and then add a random bunch of others.
474    #
475    set peers(testing) ""
476    set peers(responses) 0
477    foreach key [array names peers current-*] {
478        set peer $peers($key)
479        lappend peers(testing) $peer
480    }
481
482    #
483    # Pick other peers at random.  We don't have to try all
484    # peers--just some number that is much larger than the
485    # final number we want to talk to.
486    #
487    set maxpeers [p2p::options get max_peer_connections]
488    foreach addr [randomize $peers(all) [expr {10*$maxpeers}]] {
489        #
490        # Avoid connecting to ourself, or to peers that we're
491        # already connected to (either as inbound connections
492        # to the server or as outbound connections to others).
493        #
494        if {$addr == $myaddress
495              || [info exists peers(current-$addr)]
496              || [info exists inbound($addr)]} {
497            continue
498        }
499
500        if {[catch {p2p::client -address $addr \
501            -sendprotocol hubzero:workers<-workerc/1 \
502            -receiveprotocol hubzero:workerc<-workers/1} cnx]} {
503            continue
504        }
505        $cnx send "identity $myaddress"
506        lappend peers(testing) $cnx
507
508        # have enough connections to test?
509        if {[llength $peers(testing)] >= 2*$maxpeers} {
510            break
511        }
512    }
513
514    #
515    # Now, loop through all peers and send a "ping" message.
516    #
517    foreach cnx $peers(testing) {
518        # mark the time and send a ping to this peer
519        set peers(ping-$cnx-start) [clock clicks -milliseconds]
520        $cnx send "ping"
521    }
522
523    # if this test takes too long, just give up
524    after 10000 {peer-network goto finalize}
525}
526
527# sit here when we need to rebuild the peer-to-peer network
528# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
529peer-network state finalize -onenter {
530    after cancel {peer-network goto finalize}
531
532    # everything is finalized now, so go to idle
533    after idle {peer-network goto idle}
534}
535
536# when moving to the finalize state, decide on the network of peers
537# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
538peer-network transition measure->finalize -onchange {
539    global peers
540
541    set maxpeers [p2p::options get max_peer_connections]
542
543    # build a list:  {peer latency} {peer latency} ...
544    set plist ""
545    foreach obj $peers(testing) {
546        if {[info exists peers(ping-$obj-latency)]} {
547            lappend plist [list $obj $peers(ping-$obj-latency)]
548        }
549    }
550
551    # sort the list and extract the top peers
552    set plist [lsort -increasing -index 1 $plist]
553    set plist [lrange $plist 0 [expr {$maxpeers-1}]]
554
555    set currentpeers [array names peers current-*]
556
557    # transfer the top peers to the "current" list
558    set all ""
559    foreach rec $plist {
560        set peer [lindex $rec 0]
561        set addr [$peer address]
562        set peers(current-$addr) $peer
563        lappend all $addr
564
565        # if it is already on the "current" list, then keep it
566        set i [lsearch $currentpeers current-$addr]
567        if {$i >= 0} {
568            set currentpeers [lreplace $currentpeers $i $i]
569        }
570        set i [lsearch $peers(testing) $peer]
571        if {$i >= 0} {
572            set peers(testing) [lreplace $peers(testing) $i $i]
573        }
574    }
575    log system "connected to peers: $all"
576
577    # get rid of old peers that we no longer want to talk to
578    foreach leftover $currentpeers {
579        itcl::delete object $peers($leftover)
580        unset peers($leftover)
581    }
582
583    # clean up after this test
584    foreach obj $peers(testing) {
585        catch {itcl::delete object $obj}
586    }
587    set peers(testing) ""
588
589    foreach key [array names peers ping-*] {
590        unset peers($key)
591    }
592    set peers(responses) 0
593}
594
595# ----------------------------------------------------------------------
596#  JOB SOLICITATION
597#
598#  Each worker can receive a "solicit" request, asking for information
599#  about performance and price of peers available to work.  Each worker
600#  sends the request on to its peers, then gathers the information
601#  and sends it back to the client or to the peer requesting the
602#  information.  There can be multiple requests going on at once,
603#  and each may have different job types and return different info,
604#  so the class below helps to watch over each request until it has
605#  completed.
606# ----------------------------------------------------------------------
607itcl::class Solicitation {
608    public variable connection ""
609    public variable job ""
610    public variable path ""
611    public variable avoid ""
612    public variable token ""
613
614    private variable _serial ""
615    private variable _response ""
616    private variable _waitfor 0
617    private variable _timeout ""
618
619    constructor {args} {
620        eval configure $args
621
622        global myaddress
623        lappend path $myaddress
624        lappend avoid $myaddress
625        set _serial [incr counter]
626        set all($_serial) $this
627
628        set delay "idle"  ;# finalize after waiting for responses
629        set ttl [p2p::options get peer_time_to_live]
630        if {[llength $path] < $ttl} {
631            set mesg [list solicit -job $job -path $path -avoid "$avoid @RECIPIENTS" -token $_serial]
632            set _waitfor [broadcast_to_peers $mesg $avoid]
633
634            if {$_waitfor > 0} {
635                # add a delay proportional to ttl + time for wonks measurement
636                set delay [expr {($ttl-[llength $path]-1)*1000 + 3000}]
637            }
638        }
639        set _timeout [after $delay [itcl::code $this finalize]]
640    }
641    destructor {
642        after cancel $_timeout
643        catch {unset all($_serial)}
644    }
645
646    # this adds the info from each proffer to this solicitation
647    method response {details} {
648        set addr [lindex $details 0]
649        append _response $details "\n"
650        if {[incr _waitfor -1] <= 0} {
651            finalize
652        }
653    }
654
655    # called to finalize when all peers have responded back
656    method finalize {} {
657        global myaddress
658
659        # filter out duplicate info from clients
660        set block ""
661        foreach line [split $_response \n] {
662            set addr [lindex $line 0]
663            if {"" != $addr && ![info exists response($addr)]} {
664                append block $line "\n"
665                set response($addr) 1
666            }
667        }
668        # add details about this client to the message
669        append block "$myaddress -job $job -cost 1 -wonks [p2p::wonks::current]"
670
671        # send the composite results back to the caller
672        set cmd [list proffer $token $block]
673        if {[catch {puts $connection $cmd} err]} {
674            log error "ERROR while sending back proffer: $err"
675        }
676        itcl::delete object $this
677    }
678
679    proc proffer {serial details} {
680        if {[info exists all($serial)]} {
681            $all($serial) response $details
682        }
683    }
684
685    common counter 0 ;# generates serial nums for objects
686    common all       ;# maps serial num => solicitation object
687}
688
689# ----------------------------------------------------------------------
690log system "starting..."
691
692after idle {
693    authority-connection goto connected
694}
695
696vwait main-loop
Note: See TracBrowser for help on using the repository browser.