source: trunk/p2p/worker.tcl @ 1273

Last change on this file since 1273 was 1273, checked in by mmc, 15 years ago

Major reorganization of p2p code, and support for solicit/proffer
messages.

File size: 25.2 KB
RevLine 
[1251]1# ----------------------------------------------------------------------
2#  P2P: worker node in P2P mesh bidding on and executing jobs
3#
4#  This server is a typical worker node in the P2P mesh for HUBzero
5#  job execution.  It talks to other workers to distribute the load
6#  of job requests.  It bids on jobs based on its current resources
7#  and executes jobs to earn points from the central authority.
8# ----------------------------------------------------------------------
9#  Michael McLennan (mmclennan@purdue.edu)
10# ======================================================================
11#  Copyright (c) 2008  Purdue Research Foundation
[1257]12#
13#  See the file "license.terms" for information on usage and
14#  redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES.
[1251]15# ======================================================================
[1257]16package require Rappture
[1251]17
18# recognize other library files in this same directory
19set dir [file dirname [info script]]
20lappend auto_path $dir
21
22# handle log file for this worker
[1273]23log channel error on
24log channel system on
[1251]25log channel debug on
26
[1273]27proc ::bgerror {err} { log error "ERROR: $err $::errorInfo" }
[1251]28
[1273]29
[1251]30set myaddress "?:?"  ;# address/port for this worker
31
32# set of connections for authority servers
[1273]33p2p::options register authority_hosts 127.0.0.1:9001
[1251]34
35# register with the central authority at this frequency
[1273]36p2p::options register time_between_authority_checks 60000
[1251]37
[1273]38# rebuild the peer-to-peer network at this frequency
39p2p::options register time_between_network_rebuilds 600000
40
[1251]41# this worker should try to connect with this many other peers
[1273]42p2p::options register max_peer_connections 4
[1251]43
[1273]44# workers propagate messages until time-to-live reaches 0
45p2p::options register peer_time_to_live 4
[1257]46
[1251]47# ======================================================================
[1273]48#  PROTOCOL: hubzero:worker<-authority/1
49#
50#  The worker initiates communication with the authority, and the
51#  authority responds by sending these messages.
[1251]52# ======================================================================
[1273]53p2p::protocol::register hubzero:worker<-authority/1 {
[1251]54
55    # ------------------------------------------------------------------
[1273]56    #  INCOMING: options <key1> <value1> <key2> <value2> ...
[1251]57    #  These option settings coming from the authority override the
[1273]58    #  option settings built into the client.  The authority probably
59    #  has a more up-to-date list of other authorities and better
60    #  policies for living within the network.
[1251]61    # ------------------------------------------------------------------
[1273]62    define options {args} {
63        global server myaddress
[1251]64
65        array set options $args
[1273]66        foreach key [array names options] {
67            catch {p2p::options set $key $options($key)}
68        }
[1251]69
70        if {[info exists options(ip)]} {
71            set myaddress $options(ip):[$server port]
72            log debug "my address $myaddress"
73        }
74        return ""
75    }
76
77    # ------------------------------------------------------------------
[1273]78    #  INCOMING: peers <listOfAddresses>
[1251]79    #  This message comes in after this worker has sent the "peers"
80    #  message to request the current list of peers.  The
81    #  <listOfAddresses> is a list of host:port addresses that this
82    #  worker should contact to enter the p2p network.
83    # ------------------------------------------------------------------
[1273]84    define peers {plist} {
[1251]85        global peers
86        set peers(all) $plist
[1273]87        after idle {peer-network goto measure}
[1251]88
89        # now that we've gotten the peers, we're done with the authority
[1273]90        authority-connection goto idle
91        return ""
[1251]92    }
93
94    # ------------------------------------------------------------------
[1273]95    #  INCOMING: identity
[1251]96    #  Used for debugging, so that each client can identify itself by
97    #  name to this worker.
98    # ------------------------------------------------------------------
[1273]99    define identity {name} {
[1251]100        variable cid
101        variable handler
102        $handler connectionName $cid $name
103        return ""
104    }
[1273]105}
[1251]106
[1273]107# ======================================================================
108#  PROTOCOL: hubzero:worker<-foreman/1
109#
110#  The foreman initiates communication with a worker, and sends the
111#  various messages supported below.
112# ======================================================================
113p2p::protocol::register hubzero:worker<-foreman/1 {
114
115    # ------------------------------------------------------------------
116    #  INCOMING: solicit
117    #  Foremen send this message to solicit bids for a simulation job.
118    # ------------------------------------------------------------------
119    define solicit {args} {
120log debug "SOLICIT from foreman: $args"
121        variable cid
122        log debug "solicitation request from foreman: $args"
123        eval Solicitation ::#auto $args -connection $cid
124        return ""
125    }
[1251]126}
127
[1273]128# ======================================================================
129#  PROTOCOL: hubzero:workers<-workerc/1
130#
131#  Workers initiate connections with other workers as peers.  The
132#  following messages are sent by worker clients to the worker server.
133# ======================================================================
134p2p::protocol::register hubzero:workers<-workerc/1 {
135    # ------------------------------------------------------------------
136    #  INCOMING: identity
137    #  Used for debugging, so that each client can identify itself by
138    #  name to this worker.
139    # ------------------------------------------------------------------
140    define identity {name} {
141        variable cid
142        variable handler
143        $handler connectionName $cid $name
144        return ""
[1251]145    }
[1273]146
147    # ------------------------------------------------------------------
148    #  INCOMING: ping
149    #  If another worker sends "ping", they are trying to measure the
150    #  speed of the connection.  Send back a "pong" message.  If this
151    #  worker is close by, the other worker will stay connected to it.
152    # ------------------------------------------------------------------
153    define ping {} {
154        return "pong"
155    }
156
157    # ------------------------------------------------------------------
158    #  INCOMING: solicit -job info -path hosts -token xxx
159    #  Workers send this message on to their peers to solicit bids
160    #  for a simulation job.
161    # ------------------------------------------------------------------
162    define solicit {args} {
163log debug "SOLICIT from peer: $args"
164        variable cid
165        log debug "solicitation request from peer: $args"
166        eval Solicitation ::#auto $args -connection $cid
167        return ""
168    }
169
170    # ------------------------------------------------------------------
171    #  INCOMING: proffer <token> <details>
172    #  Workers send this message back after a "solicit" request with
173    #  details about what they can offer in terms of CPU power.  When
174    #  a worker has received all replies from its peers, it sends back
175    #  its own proffer message to the client that started the
176    #  solicitation.
177    # ------------------------------------------------------------------
178    define proffer {token details} {
179log debug "PROFFER from peer: $token $details"
180        Solicitation::proffer $token $details
181        return ""
182    }
[1251]183}
184
185# ======================================================================
[1273]186#  PROTOCOL: hubzero:workerc<-workers/1
187#
188#  The following messages are received by a worker client in response
189#  to the requests that they send to a worker server.
[1251]190# ======================================================================
[1273]191p2p::protocol::register hubzero:workerc<-workers/1 {
192    # ------------------------------------------------------------------
193    #  INCOMING: identity
194    #  Used for debugging, so that each client can identify itself by
195    #  name to this worker.
196    # ------------------------------------------------------------------
197    define identity {name} {
198        variable cid
199        variable handler
200        $handler connectionName $cid $name
201        return ""
202    }
[1251]203
[1273]204    # ------------------------------------------------------------------
205    #  INCOMING: pong
206    #  When forming the peer-to-peer network, workers send ping/pong
207    #  messages to one another to measure the latency.
208    # ------------------------------------------------------------------
209    define pong {} {
210        global peers
211        variable handler
212        set now [clock clicks -milliseconds]
213        set delay [expr {$now - $peers(ping-$handler-start)}]
214        set peers(ping-$handler-latency) $delay
215        if {[incr peers(responses)] >= [llength $peers(testing)]} {
216            after cancel {peer-network goto finalize}
217            after idle {peer-network goto finalize}
218        }
219        return ""
[1251]220    }
221
[1273]222    # ------------------------------------------------------------------
223    #  INCOMING: solicit -job info -path hosts -token xxx
224    #  Workers send this message on to their peers to solicit bids
225    #  for a simulation job.
226    # ------------------------------------------------------------------
227    define solicit {args} {
228log debug "SOLICIT from peer: $args"
229        variable cid
230        log debug "solicitation request from peer: $args"
231        eval Solicitation ::#auto $args -connection $cid
232        return ""
233    }
234
235    # ------------------------------------------------------------------
236    #  INCOMING: proffer <token> <details>
237    #  Workers send this message back after a "solicit" request with
238    #  details about what they can offer in terms of CPU power.  When
239    #  a worker has received all replies from its peers, it sends back
240    #  its own proffer message to the client that started the
241    #  solicitation.
242    # ------------------------------------------------------------------
243    define proffer {token details} {
244log debug "PROFFER from peer: $token $details"
245        Solicitation::proffer $token $details
246        return ""
247    }
[1251]248}
249
250# ----------------------------------------------------------------------
[1273]251#  COMMAND:  broadcast_to_peers <message> ?<avoidList>?
[1257]252#
[1273]253#  Used to broadcast a message out to all peers.  The <message> must
254#  be one of the commands defined in the worker protocol.  If there
255#  are no peer connections yet, this command does nothing.  If any
256#  peer appears on the <avoidList>, it is skipped.  Returns the
257#  number of messages sent out, so this peer knows how many replies
258#  to wait for.
[1257]259# ----------------------------------------------------------------------
[1273]260proc broadcast_to_peers {message {avoidList ""}} {
261    global server peers
[1257]262
[1273]263    #
264    # Build a list of all peer addresses, so we know who we're
265    # going to send this message out to.
266    #
267    set recipients ""
268    foreach key [array names peers current-*] {
269        set addr [$peers($key) address]
270        if {[lsearch $avoidList $addr] < 0} {
271            lappend recipients $addr
[1257]272        }
273    }
[1273]274    foreach cid [$server connections hubzero:workers<-workerc/1] {
275        set addr [lindex [$server connectionName $cid] 0]  ;# x.x.x.x (sockN)
276        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
277            lappend recipients $addr
278        }
279    }
[1257]280
[1273]281    #
282    # If the message has any @RECIPIENTS fields, replace them
283    # with the list of recipients
284    #
285    regsub -all @RECIPIENTS $message $recipients message
[1257]286
[1273]287    #
288    # Send the message out to all peers.  Keep a count and double-check
289    # it against the list of recipients generated above.
290    #
291    set nmesgs 0
292
293    # send off to other workers that this one has connected to
294    foreach key [array names peers current-*] {
295        set addr [$peers($key) address]
296        if {[lsearch $avoidList $addr] < 0} {
297            if {[catch {$peers($key) send $message} err] == 0} {
298                incr nmesgs
299            } else {
300                log error "ERROR: broadcast failed to [$peers($key) address]: $result\n  (message was \"$message\")"
301            }
302        }
[1257]303    }
304
[1273]305    # send off to other workers that connected to this one
306    foreach cid [$server connections hubzero:workers<-workerc/1] {
307        set addr [lindex [$server connectionName $cid] 0]  ;# x.x.x.x (sockN)
308        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
309            if {[catch {puts $cid $message} result] == 0} {
310                incr nmesgs
311            } else {
312                log error "ERROR: broadcast failed for $cid: $result"
313                log error "  (message was $message)"
314            }
315        }
316    }
317
318    # did we send the right number of messages?
319    if {[llength $recipients] != $nmesgs} {
320        log error "ERROR: sent only $nmesgs messages to peers {$recipients}"
321    }
322
323    return $nmesgs
[1257]324}
325
326# ----------------------------------------------------------------------
[1273]327#  COMMAND:  worker_got_protocol
[1257]328#
[1273]329#  Invoked whenever a peer sends their protocol message to this
330#  worker.  Sends the same protocol name back, so the other worker
331#  understands what protocol we're speaking.
[1257]332# ----------------------------------------------------------------------
[1273]333proc worker_got_protocol {cid protocol} {
334    switch -glob -- $protocol {
335        *<-worker* {
336            puts $cid [list protocol hubzero:workerc<-workers/1]
337        }
338        *<-foreman* {
339            puts $cid [list protocol hubzero:foreman<-worker/1]
340        }
341        DEF* {
342            # do nothing
343        }
344        default {
345            error "don't recognize protocol \"$protocol\""
346        }
347    }
348}
[1257]349
[1273]350# ----------------------------------------------------------------------
351#  COMMAND:  worker_got_dropped <address>
352#
353#  Invoked whenever an inbound connection to this worker drops.
354#  At some point we should try updating our connections to other
355#  peers, to replace this missing connection.
356# ----------------------------------------------------------------------
357proc worker_got_dropped {addr} {
358    global peers
359    if {[info exists peers(current-$addr)]} {
360        unset peers(current-$addr)
361        after cancel {peer-network goto measure}
362        after 5000 {peer-network goto measure}
363log debug "peer dropped: $addr"
364    }
365}
366
367# ======================================================================
368#  Connect to one of the authorities to get a list of peers.  Then,
369#  sit in the event loop and process events.
370# ======================================================================
371p2p::wonks::init
372
373set server [p2p::server -port 9101? \
374        -protocols {hubzero:workers<-workerc hubzero:worker<-foreman} \
375        -servername worker \
376        -onprotocol worker_got_protocol \
377        -ondisconnect worker_got_dropped]
378
379# ----------------------------------------------------------------------
380#  AUTHORITY CONNECTION
381#
382#  This is the state machine representing the connection to the
383#  authority server.  We start in the "idle" state.  Whenever we
384#  try to move to "connected", we'll open a connection to an authority
385#  and request updated information on workers.  If that connection
386#  fails, we'll get kicked back to "idle" and we'll try again later.
387# ----------------------------------------------------------------------
388StateMachine authority-connection
389authority-connection statedata cnx ""
390
391# sit here whenever we don't have a connection
392# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
393authority-connection state idle -onenter {
394    # have an open connection? then close it
395    if {"" != [statedata cnx]} {
396        catch {itcl::delete object [statedata cnx]}
397        statedata cnx ""
398    }
399    # try to connect again later
400    set delay [p2p::options get time_between_authority_checks]
401    after $delay {authority-connection goto connected}
402} -onleave {
403    after cancel {authority-connection goto connected}
404}
405
406# sit here after we're connected and waiting for a response
407# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
408authority-connection state connected
409
410# when moving to the connected state, make a connection
411# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
412authority-connection transition idle->connected -onchange {
413    global server
414
415    # connect to the authority and request a list of peers
416    statedata cnx ""
417    foreach addr [randomize [p2p::options get authority_hosts]] {
418        if {[catch {p2p::client -address $addr \
419            -sendprotocol hubzero:authority<-worker/1 \
420            -receiveprotocol hubzero:worker<-authority/1} result] == 0} {
421            statedata cnx $result
422            break
[1257]423        }
424    }
[1273]425
426    if {"" != [statedata cnx]} {
427        [statedata cnx] send "listening [$server port]"
428        [statedata cnx] send "peers"
429    } else {
430        error "can't connect to any authority\nAUTHORITY LIST: [p2p::options get authority_hosts]"
431    }
[1257]432}
433
434# ----------------------------------------------------------------------
[1273]435#  PEER-TO-PEER CONNECTIONS
[1251]436#
[1273]437#  This is the state machine representing the network of worker peers.
438#  We start in the "idle" state.  From time to time we move to the
439#  "measure" state and attempt to establish connections with a set
440#  peers.  We then wait for ping/pong responses and move to "finalize".
441#  When we have all responses, we move to "finalize" and finalize all
442#  connections, and then move back to "idle".
[1251]443# ----------------------------------------------------------------------
[1273]444StateMachine peer-network
445peer-network statedata cnx ""
[1251]446
[1273]447# sit here when the peer network is okay
448# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
449peer-network state idle -onenter {
450    # try to connect again later
451    set delay [p2p::options get time_between_network_rebuilds]
452    after $delay {peer-network goto measure}
453} -onleave {
454    after cancel {peer-network goto measure}
455}
456
457# sit here when we need to rebuild the peer-to-peer network
458# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
459peer-network state measure
460
461# when moving to the start state, make connections to a bunch of peers
462# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
463peer-network transition idle->measure -onchange {
464    global server myaddress peers
465
[1251]466    #
[1273]467    # Get a list of workers already connected to this one
468    # with an inbound connection to the server.
469    #
470    foreach cid [$server connections hubzero:workers<-workerc/1] {
471        set addr [lindex [$server connectionName $cid] 0]
472        set inbound($addr) $cid
473    }
474
475    #
[1251]476    # Pick a random group of peers and try to connect to them.
[1273]477    # Start with the existing peers to see if their connection
478    # is still favorable, and then add a random bunch of others.
[1251]479    #
[1273]480    set peers(testing) ""
481    set peers(responses) 0
[1251]482    foreach key [array names peers current-*] {
483        set peer $peers($key)
484        lappend peers(testing) $peer
485    }
486
487    #
488    # Pick other peers at random.  We don't have to try all
489    # peers--just some number that is much larger than the
490    # final number we want to talk to.
491    #
[1273]492    set maxpeers [p2p::options get max_peer_connections]
493    foreach addr [randomize $peers(all) [expr {10*$maxpeers}]] {
494        #
495        # Avoid connecting to ourself, or to peers that we're
496        # already connected to (either as inbound connections
497        # to the server or as outbound connections to others).
498        #
499        if {$addr == $myaddress
500              || [info exists peers(current-$addr)]
501              || [info exists inbound($addr)]} {
[1251]502            continue
503        }
504
[1273]505        if {[catch {p2p::client -address $addr \
506            -sendprotocol hubzero:workers<-workerc/1 \
507            -receiveprotocol hubzero:workerc<-workers/1} cnx]} {
508            continue
509        }
510        $cnx send "identity $myaddress"
511        lappend peers(testing) $cnx
[1251]512
[1273]513        # have enough connections to test?
514        if {[llength $peers(testing)] >= 2*$maxpeers} {
515            break
[1251]516        }
517    }
518
519    #
520    # Now, loop through all peers and send a "ping" message.
521    #
[1273]522    foreach cnx $peers(testing) {
[1251]523        # mark the time and send a ping to this peer
[1273]524        set peers(ping-$cnx-start) [clock clicks -milliseconds]
525        $cnx send "ping"
[1251]526    }
527
528    # if this test takes too long, just give up
[1273]529    after 10000 {peer-network goto finalize}
[1251]530}
531
[1273]532# sit here when we need to rebuild the peer-to-peer network
533# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
534peer-network state finalize -onenter {
535    after cancel {peer-network goto finalize}
[1251]536
[1273]537    # everything is finalized now, so go to idle
538    after idle {peer-network goto idle}
539}
[1251]540
[1273]541# when moving to the finalize state, decide on the network of peers
542# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
543peer-network transition measure->finalize -onchange {
544    global peers
[1251]545
[1273]546    set maxpeers [p2p::options get max_peer_connections]
[1251]547
[1273]548    # build a list:  {peer latency} {peer latency} ...
549    set plist ""
550    foreach obj $peers(testing) {
551        if {[info exists peers(ping-$obj-latency)]} {
552            lappend plist [list $obj $peers(ping-$obj-latency)]
[1251]553        }
[1273]554    }
[1251]555
[1273]556    # sort the list and extract the top peers
557    set plist [lsort -increasing -index 1 $plist]
558    set plist [lrange $plist 0 [expr {$maxpeers-1}]]
559
560    set currentpeers [array names peers current-*]
561
562    # transfer the top peers to the "current" list
563    set all ""
564    foreach rec $plist {
565        set peer [lindex $rec 0]
566        set addr [$peer address]
567        set peers(current-$addr) $peer
568        lappend all $addr
569
570        # if it is already on the "current" list, then keep it
571        set i [lsearch $currentpeers current-$addr]
572        if {$i >= 0} {
573            set currentpeers [lreplace $currentpeers $i $i]
[1251]574        }
[1273]575        set i [lsearch $peers(testing) $peer]
576        if {$i >= 0} {
577            set peers(testing) [lreplace $peers(testing) $i $i]
578        }
579    }
580    log system "connected to peers: $all"
[1251]581
[1273]582    # get rid of old peers that we no longer want to talk to
583    foreach leftover $currentpeers {
584        itcl::delete object $peers($leftover)
585        unset peers($leftover)
[1251]586    }
587
[1273]588    # clean up after this test
[1251]589    foreach obj $peers(testing) {
590        catch {itcl::delete object $obj}
591    }
592    set peers(testing) ""
593
594    foreach key [array names peers ping-*] {
595        unset peers($key)
596    }
597    set peers(responses) 0
598}
599
600# ----------------------------------------------------------------------
[1273]601#  JOB SOLICITATION
[1251]602#
[1273]603#  Each worker can receive a "solicit" request, asking for information
604#  about performance and price of peers available to work.  Each worker
605#  sends the request on to its peers, then gathers the information
606#  and sends it back to the client or to the peer requesting the
607#  information.  There can be multiple requests going on at once,
608#  and each may have different job types and return different info,
609#  so the class below helps to watch over each request until it has
610#  completed.
[1251]611# ----------------------------------------------------------------------
[1273]612itcl::class Solicitation {
613    public variable connection ""
614    public variable job ""
615    public variable path ""
616    public variable avoid ""
617    public variable token ""
618
619    private variable _serial ""
620    private variable _response ""
621    private variable _waitfor 0
622    private variable _timeout ""
623
624    constructor {args} {
625        eval configure $args
626
627        global myaddress
628        lappend path $myaddress
629        lappend avoid $myaddress
630        set _serial [incr counter]
631        set all($_serial) $this
632
633        set delay "idle"  ;# finalize after waiting for responses
634        set ttl [p2p::options get peer_time_to_live]
635        if {[llength $path] < $ttl} {
636            set mesg [list solicit -job $job -path $path -avoid "$avoid @RECIPIENTS" -token $_serial]
637            set _waitfor [broadcast_to_peers $mesg $avoid]
638log debug "WAIT FOR: $_waitfor"
639
640            if {$_waitfor > 0} {
641                # add a delay proportional to ttl + time for wonks measurement
642                set delay [expr {($ttl-[llength $path]-1)*1000 + 3000}]
643            }
644        }
645log debug "TIMEOUT: $delay"
646        set _timeout [after $delay [itcl::code $this finalize]]
[1251]647    }
[1273]648    destructor {
649        after cancel $_timeout
650        catch {unset all($_serial)}
651    }
652
653    # this adds the info from each proffer to this solicitation
654    method response {details} {
655        set addr [lindex $details 0]
656        append _response $details "\n"
657        if {[incr _waitfor -1] <= 0} {
658log debug "ALL RESPONSES"
659            finalize
660        }
661    }
662
663    # called to finalize when all peers have responded back
664    method finalize {} {
665        global myaddress
666
667        # filter out duplicate info from clients
668        set block ""
669        foreach line [split $_response \n] {
670            set addr [lindex $line 0]
671            if {"" != $addr && ![info exists response($addr)]} {
672                append block $line "\n"
673                set response($addr) 1
674            }
675        }
676        # add details about this client to the message
677        append block "$myaddress -job $job -cost 1 -wonks [p2p::wonks::current]"
678log debug "FINALIZE {$block}"
679
680        # send the composite results back to the caller
681        set cmd [list proffer $token $block]
682        if {[catch {puts $connection $cmd} err]} {
683            log error "ERROR while sending back proffer: $err"
684        }
685        itcl::delete object $this
686    }
687
688    proc proffer {serial details} {
689        if {[info exists all($serial)]} {
690            $all($serial) response $details
691        }
692    }
693
694    common counter 0 ;# generates serial nums for objects
695    common all       ;# maps serial num => solicitation object
[1251]696}
697
[1273]698# ----------------------------------------------------------------------
[1251]699log system "starting..."
[1273]700
701after idle {
702    authority-connection goto connected
703}
704
[1251]705vwait main-loop
Note: See TracBrowser for help on using the repository browser.