source: trunk/p2p/worker.tcl @ 5348

Last change on this file since 5348 was 3177, checked in by mmc, 7 years ago

Updated all of the copyright notices to reference the transfer to
the new HUBzero Foundation, LLC.

File size: 24.8 KB
Line 
1# ----------------------------------------------------------------------
2#  P2P: worker node in P2P mesh bidding on and executing jobs
3#
4#  This server is a typical worker node in the P2P mesh for HUBzero
5#  job execution.  It talks to other workers to distribute the load
6#  of job requests.  It bids on jobs based on its current resources
7#  and executes jobs to earn points from the central authority.
8# ----------------------------------------------------------------------
9#  Michael McLennan (mmclennan@purdue.edu)
10# ======================================================================
11#  Copyright (c) 2004-2012  HUBzero Foundation, LLC
12#
13#  See the file "license.terms" for information on usage and
14#  redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES.
15# ======================================================================
16package require Rappture
17
18# recognize other library files in this same directory
19set dir [file dirname [info script]]
20lappend auto_path $dir
21
22# handle log file for this worker
23log channel error on
24log channel system on
25log channel debug on
26
27proc ::bgerror {err} { log error "ERROR: $err $::errorInfo" }
28
29
30set myaddress "?:?"  ;# address/port for this worker
31
32# set of connections for authority servers
33p2p::options register authority_hosts 127.0.0.1:9001
34
35# register with the central authority at this frequency
36p2p::options register time_between_authority_checks 60000
37
38# rebuild the peer-to-peer network at this frequency
39p2p::options register time_between_network_rebuilds 600000
40
41# this worker should try to connect with this many other peers
42p2p::options register max_peer_connections 4
43
44# workers propagate messages until time-to-live reaches 0
45p2p::options register peer_time_to_live 4
46
47# ======================================================================
48#  PROTOCOL: hubzero:worker<-authority/1
49#
50#  The worker initiates communication with the authority, and the
51#  authority responds by sending these messages.
52# ======================================================================
53p2p::protocol::register hubzero:worker<-authority/1 {
54
55    # ------------------------------------------------------------------
56    #  INCOMING: options <key1> <value1> <key2> <value2> ...
57    #  These option settings coming from the authority override the
58    #  option settings built into the client.  The authority probably
59    #  has a more up-to-date list of other authorities and better
60    #  policies for living within the network.
61    # ------------------------------------------------------------------
62    define options {args} {
63        global server myaddress
64
65        array set options $args
66        foreach key [array names options] {
67            catch {p2p::options set $key $options($key)}
68        }
69
70        if {[info exists options(ip)]} {
71            set myaddress $options(ip):[$server port]
72            log debug "my address $myaddress"
73        }
74        return ""
75    }
76
77    # ------------------------------------------------------------------
78    #  INCOMING: peers <listOfAddresses>
79    #  This message comes in after this worker has sent the "peers"
80    #  message to request the current list of peers.  The
81    #  <listOfAddresses> is a list of host:port addresses that this
82    #  worker should contact to enter the p2p network.
83    # ------------------------------------------------------------------
84    define peers {plist} {
85        global peers
86        set peers(all) $plist
87        after idle {peer-network goto measure}
88
89        # now that we've gotten the peers, we're done with the authority
90        authority-connection goto idle
91        return ""
92    }
93
94    # ------------------------------------------------------------------
95    #  INCOMING: identity
96    #  Used for debugging, so that each client can identify itself by
97    #  name to this worker.
98    # ------------------------------------------------------------------
99    define identity {name} {
100        variable cid
101        variable handler
102        $handler connectionName $cid $name
103        return ""
104    }
105}
106
107# ======================================================================
108#  PROTOCOL: hubzero:worker<-foreman/1
109#
110#  The foreman initiates communication with a worker, and sends the
111#  various messages supported below.
112# ======================================================================
113p2p::protocol::register hubzero:worker<-foreman/1 {
114
115    # ------------------------------------------------------------------
116    #  INCOMING: solicit
117    #  Foremen send this message to solicit bids for a simulation job.
118    # ------------------------------------------------------------------
119    define solicit {args} {
120        variable cid
121        log debug "solicitation request from foreman: $args"
122        eval Solicitation ::#auto $args -connection $cid
123        return ""
124    }
125}
126
127# ======================================================================
128#  PROTOCOL: hubzero:workers<-workerc/1
129#
130#  Workers initiate connections with other workers as peers.  The
131#  following messages are sent by worker clients to the worker server.
132# ======================================================================
133p2p::protocol::register hubzero:workers<-workerc/1 {
134    # ------------------------------------------------------------------
135    #  INCOMING: identity
136    #  Used for debugging, so that each client can identify itself by
137    #  name to this worker.
138    # ------------------------------------------------------------------
139    define identity {name} {
140        variable cid
141        variable handler
142        $handler connectionName $cid $name
143        return ""
144    }
145
146    # ------------------------------------------------------------------
147    #  INCOMING: ping
148    #  If another worker sends "ping", they are trying to measure the
149    #  speed of the connection.  Send back a "pong" message.  If this
150    #  worker is close by, the other worker will stay connected to it.
151    # ------------------------------------------------------------------
152    define ping {} {
153        return "pong"
154    }
155
156    # ------------------------------------------------------------------
157    #  INCOMING: solicit -job info -path hosts -token xxx
158    #  Workers send this message on to their peers to solicit bids
159    #  for a simulation job.
160    # ------------------------------------------------------------------
161    define solicit {args} {
162        variable cid
163        log debug "solicitation request from peer: $args"
164        eval Solicitation ::#auto $args -connection $cid
165        return ""
166    }
167
168    # ------------------------------------------------------------------
169    #  INCOMING: proffer <token> <details>
170    #  Workers send this message back after a "solicit" request with
171    #  details about what they can offer in terms of CPU power.  When
172    #  a worker has received all replies from its peers, it sends back
173    #  its own proffer message to the client that started the
174    #  solicitation.
175    # ------------------------------------------------------------------
176    define proffer {token details} {
177        Solicitation::proffer $token $details
178        return ""
179    }
180}
181
182# ======================================================================
183#  PROTOCOL: hubzero:workerc<-workers/1
184#
185#  The following messages are received by a worker client in response
186#  to the requests that they send to a worker server.
187# ======================================================================
188p2p::protocol::register hubzero:workerc<-workers/1 {
189    # ------------------------------------------------------------------
190    #  INCOMING: identity
191    #  Used for debugging, so that each client can identify itself by
192    #  name to this worker.
193    # ------------------------------------------------------------------
194    define identity {name} {
195        variable cid
196        variable handler
197        $handler connectionName $cid $name
198        return ""
199    }
200
201    # ------------------------------------------------------------------
202    #  INCOMING: pong
203    #  When forming the peer-to-peer network, workers send ping/pong
204    #  messages to one another to measure the latency.
205    # ------------------------------------------------------------------
206    define pong {} {
207        global peers
208        variable handler
209        set now [clock clicks -milliseconds]
210        set delay [expr {$now - $peers(ping-$handler-start)}]
211        set peers(ping-$handler-latency) $delay
212        if {[incr peers(responses)] >= [llength $peers(testing)]} {
213            after cancel {peer-network goto finalize}
214            after idle {peer-network goto finalize}
215        }
216        return ""
217    }
218
219    # ------------------------------------------------------------------
220    #  INCOMING: solicit -job info -path hosts -token xxx
221    #  Workers send this message on to their peers to solicit bids
222    #  for a simulation job.
223    # ------------------------------------------------------------------
224    define solicit {args} {
225        variable cid
226        log debug "solicitation request from peer: $args"
227        eval Solicitation ::#auto $args -connection $cid
228        return ""
229    }
230
231    # ------------------------------------------------------------------
232    #  INCOMING: proffer <token> <details>
233    #  Workers send this message back after a "solicit" request with
234    #  details about what they can offer in terms of CPU power.  When
235    #  a worker has received all replies from its peers, it sends back
236    #  its own proffer message to the client that started the
237    #  solicitation.
238    # ------------------------------------------------------------------
239    define proffer {token details} {
240        Solicitation::proffer $token $details
241        return ""
242    }
243}
244
245# ----------------------------------------------------------------------
246#  COMMAND:  broadcast_to_peers <message> ?<avoidList>?
247#
248#  Used to broadcast a message out to all peers.  The <message> must
249#  be one of the commands defined in the worker protocol.  If there
250#  are no peer connections yet, this command does nothing.  If any
251#  peer appears on the <avoidList>, it is skipped.  Returns the
252#  number of messages sent out, so this peer knows how many replies
253#  to wait for.
254# ----------------------------------------------------------------------
255proc broadcast_to_peers {message {avoidList ""}} {
256    global server peers
257
258    #
259    # Build a list of all peer addresses, so we know who we're
260    # going to send this message out to.
261    #
262    set recipients ""
263    foreach key [array names peers current-*] {
264        set addr [$peers($key) address]
265        if {[lsearch $avoidList $addr] < 0} {
266            lappend recipients $addr
267        }
268    }
269    foreach cid [$server connections hubzero:workers<-workerc/1] {
270        set addr [lindex [$server connectionName $cid] 0]  ;# x.x.x.x (sockN)
271        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
272            lappend recipients $addr
273        }
274    }
275
276    #
277    # If the message has any @RECIPIENTS fields, replace them
278    # with the list of recipients
279    #
280    regsub -all @RECIPIENTS $message $recipients message
281
282    #
283    # Send the message out to all peers.  Keep a count and double-check
284    # it against the list of recipients generated above.
285    #
286    set nmesgs 0
287
288    # send off to other workers that this one has connected to
289    foreach key [array names peers current-*] {
290        set addr [$peers($key) address]
291        if {[lsearch $avoidList $addr] < 0} {
292            if {[catch {$peers($key) send $message} err] == 0} {
293                incr nmesgs
294            } else {
295                log error "ERROR: broadcast failed to [$peers($key) address]: $result\n  (message was \"$message\")"
296            }
297        }
298    }
299
300    # send off to other workers that connected to this one
301    foreach cid [$server connections hubzero:workers<-workerc/1] {
302        set addr [lindex [$server connectionName $cid] 0]  ;# x.x.x.x (sockN)
303        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
304            if {[catch {puts $cid $message} result] == 0} {
305                incr nmesgs
306            } else {
307                log error "ERROR: broadcast failed for $cid: $result"
308                log error "  (message was $message)"
309            }
310        }
311    }
312
313    # did we send the right number of messages?
314    if {[llength $recipients] != $nmesgs} {
315        log error "ERROR: sent only $nmesgs messages to peers {$recipients}"
316    }
317
318    return $nmesgs
319}
320
321# ----------------------------------------------------------------------
322#  COMMAND:  worker_got_protocol
323#
324#  Invoked whenever a peer sends their protocol message to this
325#  worker.  Sends the same protocol name back, so the other worker
326#  understands what protocol we're speaking.
327# ----------------------------------------------------------------------
328proc worker_got_protocol {cid protocol} {
329    switch -glob -- $protocol {
330        *<-worker* {
331            puts $cid [list protocol hubzero:workerc<-workers/1]
332        }
333        *<-foreman* {
334            puts $cid [list protocol hubzero:foreman<-worker/1]
335        }
336        DEF* {
337            # do nothing
338        }
339        default {
340            error "don't recognize protocol \"$protocol\""
341        }
342    }
343}
344
345# ----------------------------------------------------------------------
346#  COMMAND:  worker_got_dropped <address>
347#
348#  Invoked whenever an inbound connection to this worker drops.
349#  At some point we should try updating our connections to other
350#  peers, to replace this missing connection.
351# ----------------------------------------------------------------------
352proc worker_got_dropped {addr} {
353    global peers
354    if {[info exists peers(current-$addr)]} {
355        unset peers(current-$addr)
356        after cancel {peer-network goto measure}
357        after 5000 {peer-network goto measure}
358log debug "peer dropped: $addr"
359    }
360}
361
362# ======================================================================
363#  Connect to one of the authorities to get a list of peers.  Then,
364#  sit in the event loop and process events.
365# ======================================================================
366p2p::wonks::init
367
368set server [p2p::server -port 9101? \
369        -protocols {hubzero:workers<-workerc hubzero:worker<-foreman} \
370        -servername worker \
371        -onprotocol worker_got_protocol \
372        -ondisconnect worker_got_dropped]
373
374# ----------------------------------------------------------------------
375#  AUTHORITY CONNECTION
376#
377#  This is the state machine representing the connection to the
378#  authority server.  We start in the "idle" state.  Whenever we
379#  try to move to "connected", we'll open a connection to an authority
380#  and request updated information on workers.  If that connection
381#  fails, we'll get kicked back to "idle" and we'll try again later.
382# ----------------------------------------------------------------------
383StateMachine authority-connection
384authority-connection statedata cnx ""
385
386# sit here whenever we don't have a connection
387# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
388authority-connection state idle -onenter {
389    # have an open connection? then close it
390    if {"" != [statedata cnx]} {
391        catch {itcl::delete object [statedata cnx]}
392        statedata cnx ""
393    }
394    # try to connect again later
395    set delay [p2p::options get time_between_authority_checks]
396    after $delay {authority-connection goto connected}
397} -onleave {
398    after cancel {authority-connection goto connected}
399}
400
401# sit here after we're connected and waiting for a response
402# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
403authority-connection state connected
404
405# when moving to the connected state, make a connection
406# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
407authority-connection transition idle->connected -onchange {
408    global server
409
410    # connect to the authority and request a list of peers
411    statedata cnx ""
412    foreach addr [randomize [p2p::options get authority_hosts]] {
413        if {[catch {p2p::client -address $addr \
414            -sendprotocol hubzero:authority<-worker/1 \
415            -receiveprotocol hubzero:worker<-authority/1} result] == 0} {
416            statedata cnx $result
417            break
418        }
419    }
420
421    if {"" != [statedata cnx]} {
422        [statedata cnx] send "listening [$server port]"
423        [statedata cnx] send "peers"
424    } else {
425        error "can't connect to any authority\nAUTHORITY LIST: [p2p::options get authority_hosts]"
426    }
427}
428
429# ----------------------------------------------------------------------
430#  PEER-TO-PEER CONNECTIONS
431#
432#  This is the state machine representing the network of worker peers.
433#  We start in the "idle" state.  From time to time we move to the
434#  "measure" state and attempt to establish connections with a set
435#  peers.  We then wait for ping/pong responses and move to "finalize".
436#  When we have all responses, we move to "finalize" and finalize all
437#  connections, and then move back to "idle".
438# ----------------------------------------------------------------------
439StateMachine peer-network
440peer-network statedata cnx ""
441
442# sit here when the peer network is okay
443# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
444peer-network state idle -onenter {
445    # try to connect again later
446    set delay [p2p::options get time_between_network_rebuilds]
447    after $delay {peer-network goto measure}
448} -onleave {
449    after cancel {peer-network goto measure}
450}
451
452# sit here when we need to rebuild the peer-to-peer network
453# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
454peer-network state measure
455
456# when moving to the start state, make connections to a bunch of peers
457# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
458peer-network transition idle->measure -onchange {
459    global server myaddress peers
460
461    #
462    # Get a list of workers already connected to this one
463    # with an inbound connection to the server.
464    #
465    foreach cid [$server connections hubzero:workers<-workerc/1] {
466        set addr [lindex [$server connectionName $cid] 0]
467        set inbound($addr) $cid
468    }
469
470    #
471    # Pick a random group of peers and try to connect to them.
472    # Start with the existing peers to see if their connection
473    # is still favorable, and then add a random bunch of others.
474    #
475    set peers(testing) ""
476    set peers(responses) 0
477    foreach key [array names peers current-*] {
478        set peer $peers($key)
479        lappend peers(testing) $peer
480    }
481
482    #
483    # Pick other peers at random.  We don't have to try all
484    # peers--just some number that is much larger than the
485    # final number we want to talk to.
486    #
487    set maxpeers [p2p::options get max_peer_connections]
488    foreach addr [randomize $peers(all) [expr {10*$maxpeers}]] {
489        #
490        # Avoid connecting to ourself, or to peers that we're
491        # already connected to (either as inbound connections
492        # to the server or as outbound connections to others).
493        #
494        if {$addr == $myaddress
495              || [info exists peers(current-$addr)]
496              || [info exists inbound($addr)]} {
497            continue
498        }
499
500        if {[catch {p2p::client -address $addr \
501            -sendprotocol hubzero:workers<-workerc/1 \
502            -receiveprotocol hubzero:workerc<-workers/1} cnx]} {
503            continue
504        }
505        $cnx send "identity $myaddress"
506        lappend peers(testing) $cnx
507
508        # have enough connections to test?
509        if {[llength $peers(testing)] >= 2*$maxpeers} {
510            break
511        }
512    }
513
514    #
515    # Now, loop through all peers and send a "ping" message.
516    #
517    foreach cnx $peers(testing) {
518        # mark the time and send a ping to this peer
519        set peers(ping-$cnx-start) [clock clicks -milliseconds]
520        $cnx send "ping"
521    }
522
523    # if this test takes too long, just give up
524    after 10000 {peer-network goto finalize}
525}
526
527# sit here when we need to rebuild the peer-to-peer network
528# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
529peer-network state finalize -onenter {
530    after cancel {peer-network goto finalize}
531
532    # everything is finalized now, so go to idle
533    after idle {peer-network goto idle}
534}
535
536# when moving to the finalize state, decide on the network of peers
537# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
538peer-network transition measure->finalize -onchange {
539    global peers
540
541    set maxpeers [p2p::options get max_peer_connections]
542
543    # build a list:  {peer latency} {peer latency} ...
544    set plist ""
545    foreach obj $peers(testing) {
546        if {[info exists peers(ping-$obj-latency)]} {
547            lappend plist [list $obj $peers(ping-$obj-latency)]
548        }
549    }
550
551    # sort the list and extract the top peers
552    set plist [lsort -increasing -index 1 $plist]
553    set plist [lrange $plist 0 [expr {$maxpeers-1}]]
554
555    set currentpeers [array names peers current-*]
556
557    # transfer the top peers to the "current" list
558    set all ""
559    foreach rec $plist {
560        set peer [lindex $rec 0]
561        set addr [$peer address]
562        set peers(current-$addr) $peer
563        lappend all $addr
564
565        # if it is already on the "current" list, then keep it
566        set i [lsearch $currentpeers current-$addr]
567        if {$i >= 0} {
568            set currentpeers [lreplace $currentpeers $i $i]
569        }
570        set i [lsearch $peers(testing) $peer]
571        if {$i >= 0} {
572            set peers(testing) [lreplace $peers(testing) $i $i]
573        }
574    }
575    log system "connected to peers: $all"
576
577    # get rid of old peers that we no longer want to talk to
578    foreach leftover $currentpeers {
579        itcl::delete object $peers($leftover)
580        unset peers($leftover)
581    }
582
583    # clean up after this test
584    foreach obj $peers(testing) {
585        catch {itcl::delete object $obj}
586    }
587    set peers(testing) ""
588
589    foreach key [array names peers ping-*] {
590        unset peers($key)
591    }
592    set peers(responses) 0
593}
594
595# ----------------------------------------------------------------------
596#  JOB SOLICITATION
597#
598#  Each worker can receive a "solicit" request, asking for information
599#  about performance and price of peers available to work.  Each worker
600#  sends the request on to its peers, then gathers the information
601#  and sends it back to the client or to the peer requesting the
602#  information.  There can be multiple requests going on at once,
603#  and each may have different job types and return different info,
604#  so the class below helps to watch over each request until it has
605#  completed.
606# ----------------------------------------------------------------------
607itcl::class Solicitation {
608    public variable connection ""
609    public variable job ""
610    public variable path ""
611    public variable avoid ""
612    public variable token ""
613
614    private variable _serial ""
615    private variable _response ""
616    private variable _waitfor 0
617    private variable _timeout ""
618
619    constructor {args} {
620        eval configure $args
621
622        global myaddress
623        lappend path $myaddress
624        lappend avoid $myaddress
625        set _serial [incr counter]
626        set all($_serial) $this
627
628        set delay "idle"  ;# finalize after waiting for responses
629        set ttl [p2p::options get peer_time_to_live]
630        if {[llength $path] < $ttl} {
631            set mesg [list solicit -job $job -path $path -avoid "$avoid @RECIPIENTS" -token $_serial]
632            set _waitfor [broadcast_to_peers $mesg $avoid]
633
634            if {$_waitfor > 0} {
635                # add a delay proportional to ttl + time for wonks measurement
636                set delay [expr {($ttl-[llength $path]-1)*1000 + 3000}]
637            }
638        }
639        set _timeout [after $delay [itcl::code $this finalize]]
640    }
641    destructor {
642        after cancel $_timeout
643        catch {unset all($_serial)}
644    }
645
646    # this adds the info from each proffer to this solicitation
647    method response {details} {
648        set addr [lindex $details 0]
649        append _response $details "\n"
650        if {[incr _waitfor -1] <= 0} {
651            finalize
652        }
653    }
654
655    # called to finalize when all peers have responded back
656    method finalize {} {
657        global myaddress
658
659        # filter out duplicate info from clients
660        set block ""
661        foreach line [split $_response \n] {
662            set addr [lindex $line 0]
663            if {"" != $addr && ![info exists response($addr)]} {
664                append block $line "\n"
665                set response($addr) 1
666            }
667        }
668        # add details about this client to the message
669        append block "$myaddress -job $job -cost 1 -wonks [p2p::wonks::current]"
670
671        # send the composite results back to the caller
672        set cmd [list proffer $token $block]
673        if {[catch {puts $connection $cmd} err]} {
674            log error "ERROR while sending back proffer: $err"
675        }
676        itcl::delete object $this
677    }
678
679    proc proffer {serial details} {
680        if {[info exists all($serial)]} {
681            $all($serial) response $details
682        }
683    }
684
685    common counter 0 ;# generates serial nums for objects
686    common all       ;# maps serial num => solicitation object
687}
688
689# ----------------------------------------------------------------------
690log system "starting..."
691
692after idle {
693    authority-connection goto connected
694}
695
696vwait main-loop
Note: See TracBrowser for help on using the repository browser.