Changeset 1273
- Timestamp:
- Feb 5, 2009, 6:17:23 AM (16 years ago)
- Location:
- trunk/p2p
- Files:
-
- 5 added
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/p2p/Makefile
r1257 r1273 4 4 all: 5 5 $(CC) $(CFLAGS) perftest.c -o perftest 6 ./mkindex handler.tcl server.tcl client.tcl log.tcl random.tcl 6 ./mkindex handler.tcl server.tcl client.tcl log.tcl random.tcl \ 7 options.tcl protocols.tcl statemachine.tcl wonks.tcl \ 8 foreman.tcl -
trunk/p2p/authority.tcl
r1257 r1273 31 31 32 32 # register with the central authority at this frequency 33 set worker_options(time_between_ registers) 6000033 set worker_options(time_between_authority_checks) 60000 34 34 35 35 # workers should try to connect with this many other peers … … 37 37 38 38 # ====================================================================== 39 # PROTOCOL: hubzero:authority<-worker/1 40 # 41 # The worker initiates communication with the authority by sending 42 # these messages to the authority to request information about 43 # other peers. 44 # ====================================================================== 45 p2p::protocol::register hubzero:authority<-worker/1 { 46 47 # ------------------------------------------------------------------ 48 # INCOMING: listening <port> 49 # Workers use this to tell the authority what port they're 50 # listening on. Other workers should connect to this port. 51 # ------------------------------------------------------------------ 52 define listening {port} { 53 global client2address workers 54 variable handler 55 variable cid 56 57 # register this address on the list of workers 58 set addr "$client2address($cid):$port" 59 set workers($addr) "--" 60 61 # name this connection for easier debugging 62 $handler connectionName $cid $addr 63 64 return "" 65 } 66 67 # ------------------------------------------------------------------ 68 # INCOMING: peers 69 # ------------------------------------------------------------------ 70 define peers {} { 71 global workers 72 return [list peers [array names workers]] 73 } 74 } 75 76 # ====================================================================== 77 # PROTOCOL: hubzero:authority<-foreman/1 78 # 79 # The foreman initiates communication with the authority by sending 80 # these messages to request information about workers, to estimate 81 # requirements for a job, to put funds in escrow, and to finalize 82 # the transaction. 83 # ====================================================================== 84 p2p::protocol::register hubzero:authority<-foreman/1 { 85 # ------------------------------------------------------------------ 86 # INCOMING: workers 87 # Foremen use this to request the list of known workers, and then 88 # pick one worker to act as their connection to the P2P network. 89 # ------------------------------------------------------------------ 90 define workers {} { 91 global workers 92 return [list workers [array names workers]] 93 } 94 } 95 96 # ====================================================================== 39 97 # set up a server at the first open port above 9001 40 set server [Server ::#auto 9001? \ 98 set server [p2p::server \ 99 -port 9001? \ 100 -protocols {hubzero:authority<-worker hubzero:authority<-foreman} \ 41 101 -servername authority \ 42 102 -onconnect authority_client_address \ … … 66 126 proc authority_client_protocol {cid protocol} { 67 127 global worker_options client2address 68 if {"DEFAULT" != $protocol} { 69 puts $cid [list protocol $protocol] 70 puts $cid "options [array get worker_options] ip $client2address($cid)" 128 switch -glob -- $protocol { 129 *<-worker* { 130 puts $cid [list protocol hubzero:worker<-authority/1] 131 puts $cid "options [array get worker_options] ip $client2address($cid)" 132 } 133 *<-foreman* { 134 puts $cid [list protocol hubzero:foreman<-authority/1] 135 } 136 DEF* { 137 # do nothing 138 } 139 default { 140 error "don't recognize protocol \"$protocol\"" 141 } 71 142 } 72 143 } 73 144 74 # ======================================================================75 # PROTOCOL: hub:worker/176 # ======================================================================77 $server protocol hubzero:worker/178 79 # ----------------------------------------------------------------------80 # DIRECTIVE: exception <message>81 # ----------------------------------------------------------------------82 $server define hubzero:worker/1 exception {args} {83 log system "error from worker: $args"84 }85 86 # ----------------------------------------------------------------------87 # DIRECTIVE: listening <port>88 # Workers use this to tell the authority what port they're listening89 # on. Other workers should connect to this port.90 # ----------------------------------------------------------------------91 $server define hubzero:worker/1 listening {port} {92 global client2address workers93 variable handler94 variable cid95 96 # register this address on the list of workers97 set addr "$client2address($cid):$port"98 set workers($addr) "--"99 100 # name this connection for easier debugging101 $handler connectionName $cid $addr102 103 return ""104 }105 106 # ----------------------------------------------------------------------107 # DIRECTIVE: peers108 # ----------------------------------------------------------------------109 $server define hubzero:worker/1 peers {} {110 global workers111 return [list peers [array names workers]]112 }113 114 145 vwait main-loop -
trunk/p2p/client.tcl
r1257 r1273 11 11 # ====================================================================== 12 12 package require Itcl 13 14 namespace eval p2p { # forward declaration } 15 16 # ====================================================================== 17 # USAGE: p2p::client ?-option value -option value ...? 18 # 19 # Used to create a client connection to a peer-to-peer server. 20 # Recognizes the following options: 21 # -address ........... connect to server at this host:port 22 # -sendprotocol ...... tell server we're speaking this protocol 23 # -receiveprotocol ... server replies back with these commands 24 # ====================================================================== 25 proc p2p::client {args} { 26 array set options { 27 -address ? 28 -sendprotocol "" 29 -receiveprotocol "" 30 } 31 foreach {key val} $args { 32 if {![info exists options($key)]} { 33 error "bad option \"$key\": should be [join [lsort [array names options]] {, }]" 34 } 35 set options($key) $val 36 } 37 38 # create the client 39 set client [eval Client ::#auto $options(-address)] 40 41 # install the protocol for incoming commands 42 p2p::protocol::init $client $options(-receiveprotocol) 43 44 # tell the server what protocol we'll be speaking 45 $client send [list protocol $options(-sendprotocol)] 46 47 return $client 48 } 49 50 # ====================================================================== 51 # CLASS: Client 52 # ====================================================================== 13 53 14 54 itcl::class Client { -
trunk/p2p/handler.tcl
r1257 r1273 34 34 return "" 35 35 } 36 define DEFAULT exception {message} { 37 log error "ERROR: $message" 38 } 36 39 37 40 eval configure $args … … 49 52 public method protocol {name} 50 53 public method define {protocol name arglist body} 54 public method connections {{protocol *}} 51 55 public method connectionName {cid {name ""}} 52 56 public method connectionSpeaks {cid protocol} … … 54 58 protected method handle {cid} 55 59 protected method finalize {protocol} 60 protected method dropped {cid} 56 61 protected method handlerType {} 57 62 } … … 101 106 102 107 # ---------------------------------------------------------------------- 108 # USAGE: connections ?<protocol>? 109 # 110 # Returns a list of file handles for current connections that match 111 # the glob-style <protocol> pattern. If no pattern is given, then 112 # it returns all connections. Each handle can be passed to 113 # connectionName and connectionSpeaks to get more information. 114 # ---------------------------------------------------------------------- 115 itcl::body Handler::connections {{pattern *}} { 116 set rlist "" 117 foreach cid [array names _protocol] { 118 if {[string match $pattern $_protocol($cid)]} { 119 lappend rlist $cid 120 } 121 } 122 return $rlist 123 } 124 125 # ---------------------------------------------------------------------- 103 126 # USAGE: connectionName <sockId> ?<name>? 104 127 # … … 144 167 itcl::body Handler::handle {cid} { 145 168 if {[gets $cid request] < 0} { 146 log system "dropped: [connectionName $cid]" 147 # connection has connection -- forget this entity 148 catch {close $cid} 149 catch {unset _buffer($cid)} 150 catch {unset _protocol($cid)} 151 catch {unset _cname($cid)} 169 dropped $cid 152 170 } elseif {[info exists _protocol($cid)]} { 153 171 # complete command? then process it below... … … 171 189 if {[catch {$_parser($protocol) eval $request} result] == 0} { 172 190 if {[string length $result] > 0} { 173 puts $cid $result191 catch {puts $cid $result} 174 192 append mesg "ok: $result" 175 193 } 176 194 } else { 177 puts $cid [list exception $result]195 catch {puts $cid [list exception $result]} 178 196 append mesg "exception: $result" 179 197 } … … 204 222 205 223 # ---------------------------------------------------------------------- 224 # USAGE: dropped <cid> 225 # 226 # Invoked automatically whenever a client connection drops, to 227 # log the event and remove all trace of the client. Derived classes 228 # can override this method to perform other functions too. 229 # ---------------------------------------------------------------------- 230 itcl::body Handler::dropped {cid} { 231 log system "dropped: [connectionName $cid]" 232 233 # connection has connection -- forget this entity 234 catch {close $cid} 235 catch {unset _buffer($cid)} 236 catch {unset _protocol($cid)} 237 catch {unset _cname($cid)} 238 } 239 240 # ---------------------------------------------------------------------- 206 241 # USAGE: handlerType 207 242 # -
trunk/p2p/perftest.c
r1257 r1273 91 91 + tend.tv_usec - tstart.tv_usec; 92 92 93 printf(" %ld\n", tval);93 printf("rappture-wonks/v1 %ld\n", tval); 94 94 exit(0); 95 95 } -
trunk/p2p/random.tcl
r1257 r1273 38 38 return $rlist 39 39 } 40 41 # ---------------------------------------------------------------------- 42 # USAGE: random <list> 43 # 44 # Picks one element at random from the given <list>. 45 # ---------------------------------------------------------------------- 46 proc random {entries} { 47 set nrand [expr {int(rand()*[llength $entries])}] 48 return [lindex $entries $nrand] 49 } -
trunk/p2p/server.tcl
r1257 r1273 11 11 package require Itcl 12 12 13 namespace eval p2p { # forward declaration } 14 15 # ====================================================================== 16 # USAGE: p2p::server ?-option value -option value ...? 17 # 18 # Used to create a new peer-to-peer server object for this program. 19 # Recognizes the following options: 20 # -port ........ port number that the server listens on 21 # -protocols ... list of protocol root names that the server handles 22 # ... any other option supported by Server class 23 # ====================================================================== 24 proc p2p::server {args} { 25 set port "?" 26 set protocols "" 27 set options "" 28 foreach {key val} $args { 29 switch -- $key { 30 -port { set port $val } 31 -protocols { set protocols $val } 32 default { lappend options $key $val } 33 } 34 } 35 36 if {[llength $protocols] == 0} { 37 error "server needs at least one value for -protocols" 38 } 39 40 # create the server 41 set server [eval Server ::#auto $port $options] 42 43 # install the protocols that this server recognizes 44 foreach name $protocols { 45 p2p::protocol::init $server $name 46 } 47 return $server 48 } 49 50 # ====================================================================== 51 # CLASS: Server 52 # ====================================================================== 13 53 itcl::class Server { 14 54 inherit Handler … … 24 64 # this code fragment gets invoked when client declares the protocol 25 65 public variable onprotocol "" 66 67 # this code fragment gets invoked when client drops 68 public variable ondisconnect "" 26 69 27 70 constructor {port args} { … … 59 102 60 103 public method port {} 104 public method broadcast {args} 61 105 public method connectionSpeaks {cid protocol} 62 106 107 protected method dropped {cid} 63 108 protected method handlerType {} 64 109 … … 80 125 81 126 # ---------------------------------------------------------------------- 127 # USAGE: broadcast ?-protocol <name>? ?-avoid <avoidList>? <message> 128 # 129 # Sends a <message> to all clients connected to this server. If a 130 # client address appears on the -avoid list, then that client is 131 # avoided. If the -protocol is specified, then the message is sent 132 # only to clients who match the glob-style pattern for the protocol 133 # name. 134 # ---------------------------------------------------------------------- 135 itcl::body Server::broadcast {args} { 136 set pattern "*" 137 set avoidList "" 138 set i 0 139 while {$i < [llength $args]} { 140 set option [lindex $args $i] 141 if {[string index $option 0] == "-"} { 142 switch -- $option { 143 -protocol { 144 set pattern [lindex $args [expr {$i+1}]] 145 incr i 2 146 } 147 -avoid { 148 set avoidList [lindex $args [expr {$i+1}]] 149 incr i 2 150 } 151 -- { 152 incr i 153 break 154 } 155 default { 156 error "bad option \"$option\": should be -avoid, -protocol, or --" 157 } 158 } 159 } else { 160 break 161 } 162 } 163 if {$i != [llength $args]-1} { 164 error "wrong # args: should be \"broadcast ?-protocol pattern? ?-avoid clients? message\"" 165 } 166 set message [lindex $args end] 167 168 set nmesgs 0 169 foreach cid [connections $pattern] { 170 set addr [lindex [connectionName $cid] 0] ;# x.x.x.x (sockN) 171 if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} { 172 puts " inbound => [connectionName $cid]" 173 if {[catch {puts $cid $message} result] == 0} { 174 incr nmesgs 175 } else { 176 log error "ERROR: broadcast failed for $cid: $result" 177 log error " (message was $message)" 178 } 179 } 180 } 181 return $nmesgs 182 } 183 184 # ---------------------------------------------------------------------- 82 185 # USAGE: accept <cid> <addr> <port> 83 186 # … … 116 219 117 220 # ---------------------------------------------------------------------- 221 # USAGE: dropped <cid> 222 # 223 # Invoked automatically whenever a client connection drops, to 224 # log the event and remove all trace of the client. Invokes any 225 # command hook for this server to note the fact that the client 226 # has dropped. 227 # ---------------------------------------------------------------------- 228 itcl::body Server::dropped {cid} { 229 # if there's a callback to handle the drop, execute it here 230 if {[string length $ondisconnect] > 0} { 231 uplevel #0 [list $ondisconnect [connectionName $cid]] 232 } 233 234 # call the base class method to clean up after the client 235 chain $cid 236 } 237 238 # ---------------------------------------------------------------------- 118 239 # USAGE: handlerType 119 240 # -
trunk/p2p/tclIndex
r1257 r1273 10 10 set auto_index(::Handler::protocol) [list source [file join $dir handler.tcl]] 11 11 set auto_index(::Handler::define) [list source [file join $dir handler.tcl]] 12 set auto_index(::Handler::connections) [list source [file join $dir handler.tcl]] 12 13 set auto_index(::Handler::connectionName) [list source [file join $dir handler.tcl]] 13 14 set auto_index(::Handler::connectionSpeaks) [list source [file join $dir handler.tcl]] 14 15 set auto_index(::Handler::handle) [list source [file join $dir handler.tcl]] 15 16 set auto_index(::Handler::finalize) [list source [file join $dir handler.tcl]] 17 set auto_index(::Handler::dropped) [list source [file join $dir handler.tcl]] 16 18 set auto_index(::Handler::handlerType) [list source [file join $dir handler.tcl]] 19 set auto_index(::p2p::server) [list source [file join $dir server.tcl]] 17 20 set auto_index(Server) [list source [file join $dir server.tcl]] 18 21 set auto_index(::Server::port) [list source [file join $dir server.tcl]] 22 set auto_index(::Server::broadcast) [list source [file join $dir server.tcl]] 19 23 set auto_index(::Server::accept) [list source [file join $dir server.tcl]] 20 24 set auto_index(::Server::connectionSpeaks) [list source [file join $dir server.tcl]] 25 set auto_index(::Server::dropped) [list source [file join $dir server.tcl]] 21 26 set auto_index(::Server::handlerType) [list source [file join $dir server.tcl]] 27 set auto_index(::p2p::client) [list source [file join $dir client.tcl]] 22 28 set auto_index(Client) [list source [file join $dir client.tcl]] 23 29 set auto_index(::Client::send) [list source [file join $dir client.tcl]] … … 26 32 set auto_index(log) [list source [file join $dir log.tcl]] 27 33 set auto_index(randomize) [list source [file join $dir random.tcl]] 34 set auto_index(random) [list source [file join $dir random.tcl]] 35 set auto_index(::p2p::options) [list source [file join $dir options.tcl]] 36 set auto_index(::p2p::protocol::register) [list source [file join $dir protocols.tcl]] 37 set auto_index(::p2p::protocol::init) [list source [file join $dir protocols.tcl]] 38 set auto_index(::p2p::protocol::define) [list source [file join $dir protocols.tcl]] 39 set auto_index(StateMachine) [list source [file join $dir statemachine.tcl]] 40 set auto_index(::StateMachine::state) [list source [file join $dir statemachine.tcl]] 41 set auto_index(::StateMachine::transition) [list source [file join $dir statemachine.tcl]] 42 set auto_index(::StateMachine::goto) [list source [file join $dir statemachine.tcl]] 43 set auto_index(::StateMachine::statedata) [list source [file join $dir statemachine.tcl]] 44 set auto_index(::p2p::wonks::init) [list source [file join $dir wonks.tcl]] 45 set auto_index(::p2p::wonks::current) [list source [file join $dir wonks.tcl]] 46 set auto_index(bgerror) [list source [file join $dir foreman.tcl]] 47 set auto_index(::Rappture::foreman::bids) [list source [file join $dir foreman.tcl]] -
trunk/p2p/test.tcl
r1257 r1273 23 23 set nodes(all) "" 24 24 set nodeRadius 15 25 26 option add *highlightBackground [. cget -background] 27 option add *client*background gray 28 option add *client*highlightBackground gray 29 option add *client*troughColor darkGray 25 30 26 31 # ====================================================================== … … 162 167 # Build the main interface 163 168 # ====================================================================== 169 frame .client -borderwidth 8 -relief flat 170 pack .client -side right -fill y 171 button .client.getbids -text "Get Bids:" -command test_bids 172 pack .client.getbids -side top -anchor w 173 frame .client.cntls 174 pack .client.cntls -side bottom -fill x 175 button .client.cntls.run -text "Spend" -command test_spend 176 pack .client.cntls.run -side left 177 entry .client.cntls.points -width 8 178 pack .client.cntls.points -side left 179 label .client.cntls.pointsl -text "points" 180 pack .client.cntls.pointsl -side left 181 182 frame .client.bids 183 pack .client.bids -side bottom -expand yes -fill both 184 scrollbar .client.bids.ysbar -orient vertical -command {.client.bids.info yview} 185 pack .client.bids.ysbar -side right -fill y 186 listbox .client.bids.info -yscrollcommand {.client.bids.ysbar set} 187 pack .client.bids.info -side left -expand yes -fill both 188 164 189 frame .cntls 165 190 pack .cntls -fill x … … 181 206 } 182 207 pack .cntls.layout -side left -padx 4 -pady 2 208 209 entry .cntls.workers -width 5 210 pack .cntls.workers -side right -padx {0 4} -pady 2 211 .cntls.workers insert end "3" 212 label .cntls.workersl -text "Workers:" 213 pack .cntls.workersl -side right -pady 2 183 214 184 215 frame .player … … 238 269 239 270 # launch a series of workers 240 for {set i 0} {$i < 20} {incr i} {271 for {set i 0} {$i < [.cntls.workers get]} {incr i} { 241 272 lappend processes [exec tclsh worker.tcl &] 242 273 after [expr {int(rand()*5000)}] … … 266 297 set info [read $fid] 267 298 close $fid 299 300 if {[regexp -- {foreman<-} $info]} { 301 # skip log file from foreman 302 continue 303 } 268 304 269 305 # get the address for this host … … 322 358 close $fid 323 359 puts "\nscanning $fname" 360 361 if {[regexp -- {foreman<-} $info]} { 362 # skip log file from foreman 363 continue 364 } 324 365 325 366 catch {unset started} … … 358 399 unset started(connect$cid-addr) 359 400 360 } elseif {[regexp { server message from ([a-zA-Z0-9\.]+:[0-9]+) \(([a-z0-9]+)\): +(.+) => (.*)} $mesg match addr cid cmd result]} {401 } elseif {[regexp {(server|client) message from ([a-zA-Z0-9\.]+:[0-9]+) \(([a-z0-9]+)\): +(.+) => (.*)} $mesg match which addr cid cmd result]} { 361 402 if {![string match identity* $cmd]} { 362 403 append actions($tval) $mesg \n … … 434 475 set from $nodes($fname) 435 476 set addr $started($key) 477 478 if {![info exists nodes($addr-x)]} { 479 unset started(connect$cid-time) 480 unset started(connect$cid-addr) 481 continue 482 } 436 483 set x0 $nodes($from-x) 437 484 set y0 $nodes($from-y) … … 558 605 pack .diagram.$view -expand yes -fill both 559 606 } 607 608 proc test_bids {} { 609 set info [Rappture::foreman::bids] 610 .client.bids.info delete 0 end 611 eval .client.bids.info insert end $info 612 } -
trunk/p2p/worker.tcl
r1257 r1273 21 21 22 22 # handle log file for this worker 23 log channel error on 24 log channel system on 23 25 log channel debug on 24 log channel system on 25 26 # set up a server at the first open port above 9101 27 set server [Server ::#auto 9101? -servername worker \ 28 -onprotocol worker_peers_protocol] 29 30 # ====================================================================== 31 # OPTIONS 32 # These options get the worker started, but new option settings 33 # some from the authority after this worker first connects. 34 # ====================================================================== 35 set authority "" ;# file handle for authority connection 26 27 proc ::bgerror {err} { log error "ERROR: $err $::errorInfo" } 28 29 36 30 set myaddress "?:?" ;# address/port for this worker 37 31 38 32 # set of connections for authority servers 39 set options(authority_hosts)127.0.0.1:900133 p2p::options register authority_hosts 127.0.0.1:9001 40 34 41 35 # register with the central authority at this frequency 42 set options(time_between_registers) 10000 36 p2p::options register time_between_authority_checks 60000 37 38 # rebuild the peer-to-peer network at this frequency 39 p2p::options register time_between_network_rebuilds 600000 43 40 44 41 # this worker should try to connect with this many other peers 45 set options(max_peer_connections) 4 46 47 # number of seconds between each check of system load 48 set options(time_between_load_checks) 60000 49 50 # ====================================================================== 51 # PEERS 52 # ====================================================================== 53 # eventually set to the list of all peers in the network 54 set peers(all) "" 55 56 # list of peers that we're testing connections with 57 set peers(testing) "" 58 59 # ====================================================================== 60 # SYSTEM LOAD 61 # ====================================================================== 62 # Check system load every so often. When the load changes 63 # significantly, execute a "perftest" run to compute the current 64 # number of wonks available to this worker. 65 66 set sysload(lastLoad) -1 67 set sysload(wonks) -1 68 set sysload(measured) -1 69 70 after idle worker_load_check 71 72 # ====================================================================== 73 # PROTOCOL: hubzero:peer/1 74 # This protocol gets used when another worker connects to this one. 75 # ====================================================================== 76 $server protocol hubzero:peer/1 77 78 $server define hubzero:peer/1 exception {message} { 79 log system "ERROR: $message" 80 } 81 82 # ---------------------------------------------------------------------- 83 # DIRECTIVE: identity 84 # Used for debugging, so that each client can identify itself by 85 # name to this worker. 86 # ---------------------------------------------------------------------- 87 $server define hubzero:peer/1 identity {name} { 88 variable cid 89 variable handler 90 $handler connectionName $cid $name 91 return "" 92 } 93 94 # ---------------------------------------------------------------------- 95 # DIRECTIVE: ping 96 # ---------------------------------------------------------------------- 97 $server define hubzero:peer/1 ping {} { 98 return "pong" 99 } 100 101 # ====================================================================== 102 # PROTOCOL: hubzero:worker/1 103 # The authority_connect procedure connects this worker to an authority 104 # and begins the authority/worker protocol. The directives below 105 # handle the incoming (authority->worker) traffic. 106 # ====================================================================== 107 proc authority_connect {} { 108 global authority options 109 log system "connecting to authorities..." 110 111 authority_disconnect 112 113 # scan through list of authorities and try to connect 114 foreach addr [randomize $options(authority_hosts)] { 115 if {[catch {Client ::#auto $addr} result] == 0} { 116 set authority $result 117 break 118 } 119 } 120 121 if {"" == $authority} { 122 error "can't connect to any known authorities" 123 } 124 125 # add handlers to process the incoming commands... 126 $authority protocol hubzero:worker/1 127 $authority define hubzero:worker/1 exception {args} { 128 log system "error from authority: $args" 129 } 130 131 # ------------------------------------------------------------------ 132 # DIRECTIVE: options <key1> <value1> <key2> <value2> ... 42 p2p::options register max_peer_connections 4 43 44 # workers propagate messages until time-to-live reaches 0 45 p2p::options register peer_time_to_live 4 46 47 # ====================================================================== 48 # PROTOCOL: hubzero:worker<-authority/1 49 # 50 # The worker initiates communication with the authority, and the 51 # authority responds by sending these messages. 52 # ====================================================================== 53 p2p::protocol::register hubzero:worker<-authority/1 { 54 55 # ------------------------------------------------------------------ 56 # INCOMING: options <key1> <value1> <key2> <value2> ... 133 57 # These option settings coming from the authority override the 134 # option settings built into the client at the top of this 135 # script. The authority probably has a more up-to-date list 136 # of other authorities and better policies for living within 137 # the network. 138 # ------------------------------------------------------------------ 139 $authority define hubzero:worker/1 options {args} { 140 global options server myaddress 58 # option settings built into the client. The authority probably 59 # has a more up-to-date list of other authorities and better 60 # policies for living within the network. 61 # ------------------------------------------------------------------ 62 define options {args} { 63 global server myaddress 141 64 142 65 array set options $args 66 foreach key [array names options] { 67 catch {p2p::options set $key $options($key)} 68 } 143 69 144 70 if {[info exists options(ip)]} { … … 150 76 151 77 # ------------------------------------------------------------------ 152 # DIRECTIVE: peers <listOfAddresses>78 # INCOMING: peers <listOfAddresses> 153 79 # This message comes in after this worker has sent the "peers" 154 80 # message to request the current list of peers. The … … 156 82 # worker should contact to enter the p2p network. 157 83 # ------------------------------------------------------------------ 158 $authority define hubzero:worker/1peers {plist} {84 define peers {plist} { 159 85 global peers 160 86 set peers(all) $plist 161 after idle worker_peers_update87 after idle {peer-network goto measure} 162 88 163 89 # now that we've gotten the peers, we're done with the authority 164 authority_disconnect 165 } 166 167 # ------------------------------------------------------------------ 168 # DIRECTIVE: identity 90 authority-connection goto idle 91 return "" 92 } 93 94 # ------------------------------------------------------------------ 95 # INCOMING: identity 169 96 # Used for debugging, so that each client can identify itself by 170 97 # name to this worker. 171 98 # ------------------------------------------------------------------ 172 $authority define hubzero:worker/1identity {name} {99 define identity {name} { 173 100 variable cid 174 101 variable handler … … 176 103 return "" 177 104 } 178 179 $authority send "protocol hubzero:worker/1" 180 return $authority 181 } 182 183 proc authority_disconnect {} { 184 global authority 185 if {"" != $authority} { 186 catch {itcl::delete object $authority} 187 set authority "" 188 } 189 } 190 191 # ====================================================================== 192 # USEFUL ROUTINES 193 # ====================================================================== 194 # ---------------------------------------------------------------------- 195 # COMMAND: worker_register 196 # 197 # Invoked when this worker first starts up and periodically thereafter 198 # to register the worker with the central authority, and to request 199 # a list of peers that this peer can talk to. 200 # ---------------------------------------------------------------------- 201 proc worker_register {} { 202 global options server 105 } 106 107 # ====================================================================== 108 # PROTOCOL: hubzero:worker<-foreman/1 109 # 110 # The foreman initiates communication with a worker, and sends the 111 # various messages supported below. 112 # ====================================================================== 113 p2p::protocol::register hubzero:worker<-foreman/1 { 114 115 # ------------------------------------------------------------------ 116 # INCOMING: solicit 117 # Foremen send this message to solicit bids for a simulation job. 118 # ------------------------------------------------------------------ 119 define solicit {args} { 120 log debug "SOLICIT from foreman: $args" 121 variable cid 122 log debug "solicitation request from foreman: $args" 123 eval Solicitation ::#auto $args -connection $cid 124 return "" 125 } 126 } 127 128 # ====================================================================== 129 # PROTOCOL: hubzero:workers<-workerc/1 130 # 131 # Workers initiate connections with other workers as peers. The 132 # following messages are sent by worker clients to the worker server. 133 # ====================================================================== 134 p2p::protocol::register hubzero:workers<-workerc/1 { 135 # ------------------------------------------------------------------ 136 # INCOMING: identity 137 # Used for debugging, so that each client can identify itself by 138 # name to this worker. 139 # ------------------------------------------------------------------ 140 define identity {name} { 141 variable cid 142 variable handler 143 $handler connectionName $cid $name 144 return "" 145 } 146 147 # ------------------------------------------------------------------ 148 # INCOMING: ping 149 # If another worker sends "ping", they are trying to measure the 150 # speed of the connection. Send back a "pong" message. If this 151 # worker is close by, the other worker will stay connected to it. 152 # ------------------------------------------------------------------ 153 define ping {} { 154 return "pong" 155 } 156 157 # ------------------------------------------------------------------ 158 # INCOMING: solicit -job info -path hosts -token xxx 159 # Workers send this message on to their peers to solicit bids 160 # for a simulation job. 161 # ------------------------------------------------------------------ 162 define solicit {args} { 163 log debug "SOLICIT from peer: $args" 164 variable cid 165 log debug "solicitation request from peer: $args" 166 eval Solicitation ::#auto $args -connection $cid 167 return "" 168 } 169 170 # ------------------------------------------------------------------ 171 # INCOMING: proffer <token> <details> 172 # Workers send this message back after a "solicit" request with 173 # details about what they can offer in terms of CPU power. When 174 # a worker has received all replies from its peers, it sends back 175 # its own proffer message to the client that started the 176 # solicitation. 177 # ------------------------------------------------------------------ 178 define proffer {token details} { 179 log debug "PROFFER from peer: $token $details" 180 Solicitation::proffer $token $details 181 return "" 182 } 183 } 184 185 # ====================================================================== 186 # PROTOCOL: hubzero:workerc<-workers/1 187 # 188 # The following messages are received by a worker client in response 189 # to the requests that they send to a worker server. 190 # ====================================================================== 191 p2p::protocol::register hubzero:workerc<-workers/1 { 192 # ------------------------------------------------------------------ 193 # INCOMING: identity 194 # Used for debugging, so that each client can identify itself by 195 # name to this worker. 196 # ------------------------------------------------------------------ 197 define identity {name} { 198 variable cid 199 variable handler 200 $handler connectionName $cid $name 201 return "" 202 } 203 204 # ------------------------------------------------------------------ 205 # INCOMING: pong 206 # When forming the peer-to-peer network, workers send ping/pong 207 # messages to one another to measure the latency. 208 # ------------------------------------------------------------------ 209 define pong {} { 210 global peers 211 variable handler 212 set now [clock clicks -milliseconds] 213 set delay [expr {$now - $peers(ping-$handler-start)}] 214 set peers(ping-$handler-latency) $delay 215 if {[incr peers(responses)] >= [llength $peers(testing)]} { 216 after cancel {peer-network goto finalize} 217 after idle {peer-network goto finalize} 218 } 219 return "" 220 } 221 222 # ------------------------------------------------------------------ 223 # INCOMING: solicit -job info -path hosts -token xxx 224 # Workers send this message on to their peers to solicit bids 225 # for a simulation job. 226 # ------------------------------------------------------------------ 227 define solicit {args} { 228 log debug "SOLICIT from peer: $args" 229 variable cid 230 log debug "solicitation request from peer: $args" 231 eval Solicitation ::#auto $args -connection $cid 232 return "" 233 } 234 235 # ------------------------------------------------------------------ 236 # INCOMING: proffer <token> <details> 237 # Workers send this message back after a "solicit" request with 238 # details about what they can offer in terms of CPU power. When 239 # a worker has received all replies from its peers, it sends back 240 # its own proffer message to the client that started the 241 # solicitation. 242 # ------------------------------------------------------------------ 243 define proffer {token details} { 244 log debug "PROFFER from peer: $token $details" 245 Solicitation::proffer $token $details 246 return "" 247 } 248 } 249 250 # ---------------------------------------------------------------------- 251 # COMMAND: broadcast_to_peers <message> ?<avoidList>? 252 # 253 # Used to broadcast a message out to all peers. The <message> must 254 # be one of the commands defined in the worker protocol. If there 255 # are no peer connections yet, this command does nothing. If any 256 # peer appears on the <avoidList>, it is skipped. Returns the 257 # number of messages sent out, so this peer knows how many replies 258 # to wait for. 259 # ---------------------------------------------------------------------- 260 proc broadcast_to_peers {message {avoidList ""}} { 261 global server peers 262 263 # 264 # Build a list of all peer addresses, so we know who we're 265 # going to send this message out to. 266 # 267 set recipients "" 268 foreach key [array names peers current-*] { 269 set addr [$peers($key) address] 270 if {[lsearch $avoidList $addr] < 0} { 271 lappend recipients $addr 272 } 273 } 274 foreach cid [$server connections hubzero:workers<-workerc/1] { 275 set addr [lindex [$server connectionName $cid] 0] ;# x.x.x.x (sockN) 276 if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} { 277 lappend recipients $addr 278 } 279 } 280 281 # 282 # If the message has any @RECIPIENTS fields, replace them 283 # with the list of recipients 284 # 285 regsub -all @RECIPIENTS $message $recipients message 286 287 # 288 # Send the message out to all peers. Keep a count and double-check 289 # it against the list of recipients generated above. 290 # 291 set nmesgs 0 292 293 # send off to other workers that this one has connected to 294 foreach key [array names peers current-*] { 295 set addr [$peers($key) address] 296 if {[lsearch $avoidList $addr] < 0} { 297 if {[catch {$peers($key) send $message} err] == 0} { 298 incr nmesgs 299 } else { 300 log error "ERROR: broadcast failed to [$peers($key) address]: $result\n (message was \"$message\")" 301 } 302 } 303 } 304 305 # send off to other workers that connected to this one 306 foreach cid [$server connections hubzero:workers<-workerc/1] { 307 set addr [lindex [$server connectionName $cid] 0] ;# x.x.x.x (sockN) 308 if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} { 309 if {[catch {puts $cid $message} result] == 0} { 310 incr nmesgs 311 } else { 312 log error "ERROR: broadcast failed for $cid: $result" 313 log error " (message was $message)" 314 } 315 } 316 } 317 318 # did we send the right number of messages? 319 if {[llength $recipients] != $nmesgs} { 320 log error "ERROR: sent only $nmesgs messages to peers {$recipients}" 321 } 322 323 return $nmesgs 324 } 325 326 # ---------------------------------------------------------------------- 327 # COMMAND: worker_got_protocol 328 # 329 # Invoked whenever a peer sends their protocol message to this 330 # worker. Sends the same protocol name back, so the other worker 331 # understands what protocol we're speaking. 332 # ---------------------------------------------------------------------- 333 proc worker_got_protocol {cid protocol} { 334 switch -glob -- $protocol { 335 *<-worker* { 336 puts $cid [list protocol hubzero:workerc<-workers/1] 337 } 338 *<-foreman* { 339 puts $cid [list protocol hubzero:foreman<-worker/1] 340 } 341 DEF* { 342 # do nothing 343 } 344 default { 345 error "don't recognize protocol \"$protocol\"" 346 } 347 } 348 } 349 350 # ---------------------------------------------------------------------- 351 # COMMAND: worker_got_dropped <address> 352 # 353 # Invoked whenever an inbound connection to this worker drops. 354 # At some point we should try updating our connections to other 355 # peers, to replace this missing connection. 356 # ---------------------------------------------------------------------- 357 proc worker_got_dropped {addr} { 358 global peers 359 if {[info exists peers(current-$addr)]} { 360 unset peers(current-$addr) 361 after cancel {peer-network goto measure} 362 after 5000 {peer-network goto measure} 363 log debug "peer dropped: $addr" 364 } 365 } 366 367 # ====================================================================== 368 # Connect to one of the authorities to get a list of peers. Then, 369 # sit in the event loop and process events. 370 # ====================================================================== 371 p2p::wonks::init 372 373 set server [p2p::server -port 9101? \ 374 -protocols {hubzero:workers<-workerc hubzero:worker<-foreman} \ 375 -servername worker \ 376 -onprotocol worker_got_protocol \ 377 -ondisconnect worker_got_dropped] 378 379 # ---------------------------------------------------------------------- 380 # AUTHORITY CONNECTION 381 # 382 # This is the state machine representing the connection to the 383 # authority server. We start in the "idle" state. Whenever we 384 # try to move to "connected", we'll open a connection to an authority 385 # and request updated information on workers. If that connection 386 # fails, we'll get kicked back to "idle" and we'll try again later. 387 # ---------------------------------------------------------------------- 388 StateMachine authority-connection 389 authority-connection statedata cnx "" 390 391 # sit here whenever we don't have a connection 392 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 393 authority-connection state idle -onenter { 394 # have an open connection? then close it 395 if {"" != [statedata cnx]} { 396 catch {itcl::delete object [statedata cnx]} 397 statedata cnx "" 398 } 399 # try to connect again later 400 set delay [p2p::options get time_between_authority_checks] 401 after $delay {authority-connection goto connected} 402 } -onleave { 403 after cancel {authority-connection goto connected} 404 } 405 406 # sit here after we're connected and waiting for a response 407 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 408 authority-connection state connected 409 410 # when moving to the connected state, make a connection 411 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 412 authority-connection transition idle->connected -onchange { 413 global server 203 414 204 415 # connect to the authority and request a list of peers 205 if {[catch { 206 set client [authority_connect] 207 $client send "listening [$server port]" 208 $client send "peers" 209 } result]} { 210 log system "ERROR: $result" 211 } 212 213 # register again at regular intervals in case the authority 214 # gets restarted in between. 215 after $options(time_between_registers) worker_register 216 } 217 218 # ---------------------------------------------------------------------- 219 # COMMAND: worker_load_check 220 # 221 # Invoked when this worker first starts up and periodically thereafter 222 # to compute the system load available to the worker. If the system 223 # load has changed significantly, then the "perftest" program is 224 # executed to get a measure of performance available. This program 225 # returns a number of "wonks" which we report to peers as our available 226 # performance. 227 # ---------------------------------------------------------------------- 228 proc worker_load_check {} { 229 global options sysload dir 230 231 # see if the load has changed significantly 232 set changed 0 233 if {$sysload(lastLoad) < 0} { 234 set changed 1 416 statedata cnx "" 417 foreach addr [randomize [p2p::options get authority_hosts]] { 418 if {[catch {p2p::client -address $addr \ 419 -sendprotocol hubzero:authority<-worker/1 \ 420 -receiveprotocol hubzero:worker<-authority/1} result] == 0} { 421 statedata cnx $result 422 break 423 } 424 } 425 426 if {"" != [statedata cnx]} { 427 [statedata cnx] send "listening [$server port]" 428 [statedata cnx] send "peers" 235 429 } else { 236 set load [Rappture::sysinfo load5] 237 if {$load < 0.9*$sysload(lastLoad) || $load > 1.1*$sysload(lastLoad)} { 238 set changed 1 239 } 240 } 241 242 if {$changed} { 243 set sysload(lastLoad) [Rappture::sysinfo load5] 244 puts "LOAD CHANGED: $sysload(lastLoad) [Rappture::sysinfo freeram freeswap]" 245 set sysload(measured) [clock seconds] 246 247 # Run the program, but don't use exec, since it blocks. 248 # Instead, run it in the background and harvest its output later. 249 set sysload(test) [open "| [file join $dir perftest]" r] 250 fileevent $sysload(test) readable worker_load_results 251 } 252 253 # monitor the system load at regular intervals 254 after $options(time_between_load_checks) worker_load_check 255 } 256 257 # ---------------------------------------------------------------------- 258 # COMMAND: worker_load_results 259 # 260 # Invoked automatically when the "perftest" run finishes. Reads the 261 # number of wonks reported on standard output and reports them to 262 # other peers. 263 # ---------------------------------------------------------------------- 264 proc worker_load_results {} { 265 global sysload 266 267 if {[catch {read $sysload(test)} msg] == 0} { 268 if {[regexp {[0-9]+} $msg match]} { 269 puts "WONKS: $match" 270 set sysload(wonks) $match 271 } else { 272 log system "ERROR: performance test failed: $msg" 273 } 274 } 275 catch {close $sysload(test)} 276 set sysload(test) "" 277 } 278 279 # ---------------------------------------------------------------------- 280 # COMMAND: worker_peers_update 281 # 282 # Invoked when this worker has received a list of peers, and from 283 # time to time thereafter, to establish connections with other peers 284 # in the network. This worker picks a number of peers randomly from 285 # the list of all available peers, and then pings each of them. 286 # Timing results from the pings are stored away, and when all pings 287 # have completed, this worker picks the best few connections and 288 # keeps those. 289 # ---------------------------------------------------------------------- 290 proc worker_peers_update {} { 291 global options peers myaddress 292 worker_peers_cleanup 430 error "can't connect to any authority\nAUTHORITY LIST: [p2p::options get authority_hosts]" 431 } 432 } 433 434 # ---------------------------------------------------------------------- 435 # PEER-TO-PEER CONNECTIONS 436 # 437 # This is the state machine representing the network of worker peers. 438 # We start in the "idle" state. From time to time we move to the 439 # "measure" state and attempt to establish connections with a set 440 # peers. We then wait for ping/pong responses and move to "finalize". 441 # When we have all responses, we move to "finalize" and finalize all 442 # connections, and then move back to "idle". 443 # ---------------------------------------------------------------------- 444 StateMachine peer-network 445 peer-network statedata cnx "" 446 447 # sit here when the peer network is okay 448 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 449 peer-network state idle -onenter { 450 # try to connect again later 451 set delay [p2p::options get time_between_network_rebuilds] 452 after $delay {peer-network goto measure} 453 } -onleave { 454 after cancel {peer-network goto measure} 455 } 456 457 # sit here when we need to rebuild the peer-to-peer network 458 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 459 peer-network state measure 460 461 # when moving to the start state, make connections to a bunch of peers 462 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 463 peer-network transition idle->measure -onchange { 464 global server myaddress peers 465 466 # 467 # Get a list of workers already connected to this one 468 # with an inbound connection to the server. 469 # 470 foreach cid [$server connections hubzero:workers<-workerc/1] { 471 set addr [lindex [$server connectionName $cid] 0] 472 set inbound($addr) $cid 473 } 293 474 294 475 # 295 476 # Pick a random group of peers and try to connect to them. 296 # Start with the existing peers, and then add a random 297 # bunch of others. 298 # 477 # Start with the existing peers to see if their connection 478 # is still favorable, and then add a random bunch of others. 479 # 480 set peers(testing) "" 481 set peers(responses) 0 299 482 foreach key [array names peers current-*] { 300 483 set peer $peers($key) … … 307 490 # final number we want to talk to. 308 491 # 309 set ntest [expr {10 * $options(max_peer_connections)}] 310 foreach addr [randomize $peers(all) $ntest] { 311 if {$addr == $myaddress} { 492 set maxpeers [p2p::options get max_peer_connections] 493 foreach addr [randomize $peers(all) [expr {10*$maxpeers}]] { 494 # 495 # Avoid connecting to ourself, or to peers that we're 496 # already connected to (either as inbound connections 497 # to the server or as outbound connections to others). 498 # 499 if {$addr == $myaddress 500 || [info exists peers(current-$addr)] 501 || [info exists inbound($addr)]} { 312 502 continue 313 503 } 314 if {![info exists peers(current-$addr)] 315 && [catch {Client ::#auto $addr} peer] == 0} { 316 # open a new connection to this address 317 lappend peers(testing) $peer 318 $peer protocol hubzero:peer/1 319 320 $peer define hubzero:peer/1 exception {message} { 321 log system "ERROR: $message" 322 } 323 324 $peer define hubzero:peer/1 identity {name} { 325 variable cid 326 variable handler 327 $handler connectionName $cid $name 328 return "" 329 } 330 331 # when we get a "pong" back, store the latency 332 $peer define hubzero:peer/1 pong {} { 333 global peers 334 variable handler 335 puts " pong from $handler" 336 set now [clock clicks -milliseconds] 337 set delay [expr {$now - $peers(ping-$handler-start)}] 338 set peers(ping-$handler-latency) $delay 339 incr peers(responses) 340 worker_peers_finalize 341 return "" 342 } 343 344 # start the ping/pong session with the peer 345 $peer send "protocol hubzero:peer/1" 346 347 # send tell this peer our name (for debugging) 348 if {[log channel debug]} { 349 $peer send "identity $myaddress" 350 } 504 505 if {[catch {p2p::client -address $addr \ 506 -sendprotocol hubzero:workers<-workerc/1 \ 507 -receiveprotocol hubzero:workerc<-workers/1} cnx]} { 508 continue 509 } 510 $cnx send "identity $myaddress" 511 lappend peers(testing) $cnx 512 513 # have enough connections to test? 514 if {[llength $peers(testing)] >= 2*$maxpeers} { 515 break 351 516 } 352 517 } … … 355 520 # Now, loop through all peers and send a "ping" message. 356 521 # 357 foreach peer $peers(testing) { 358 puts "pinging $peer = [$peer address]..." 522 foreach cnx $peers(testing) { 359 523 # mark the time and send a ping to this peer 360 set peers(ping-$ peer-start) [clock clicks -milliseconds]361 $ peersend "ping"524 set peers(ping-$cnx-start) [clock clicks -milliseconds] 525 $cnx send "ping" 362 526 } 363 527 364 528 # if this test takes too long, just give up 365 after 10000 worker_peers_finalize -force 366 } 367 368 # ---------------------------------------------------------------------- 369 # COMMAND: worker_peers_finalize ?-force? 370 # 371 # Called after worker_peers_update has finished its business to 372 # clean up all of the connections that were open for testing. 373 # ---------------------------------------------------------------------- 374 proc worker_peers_finalize {{option -check}} { 375 global peers options 376 if {$option == "-force" || $peers(responses) == [llength $peers(testing)]} { 377 # build a list: {peer latency} {peer latency} ... 378 set plist "" 379 foreach obj $peers(testing) { 380 if {[info exists peers(ping-$obj-latency)]} { 381 lappend plist [list $obj $peers(ping-$obj-latency)] 382 } 383 } 384 puts "-------------------\nFINALIZING $::myaddress" 385 puts "PINGS: $plist" 386 387 # sort the list and extract the top peers 388 set plist [lsort -increasing -index 1 $plist] 389 set plist [lrange $plist 0 [expr {$options(max_peer_connections)-1}]] 390 puts "TOP: $plist" 391 392 set current [array names peers current-*] 393 puts " current: $current" 394 395 # transfer the top peers to the "current" list 396 set all "" 397 foreach rec $plist { 398 set peer [lindex $rec 0] 399 set addr [$peer address] 400 set peers(current-$addr) $peer 401 lappend all $addr 402 403 # if it is already on the "current" list, then keep it 404 set i [lsearch $current current-$addr] 405 if {$i >= 0} { 406 set current [lreplace $current $i $i] 407 } 408 set i [lsearch $peers(testing) $peer] 409 if {$i >= 0} { 410 set peers(testing) [lreplace $peers(testing) $i $i] 411 } 412 } 413 log system "connected to peers: $all" 414 puts " new current: [array names peers current-*]" 415 puts " final: $all" 416 puts " leftover: $current $peers(testing)" 417 418 # get rid of old peers that we no longer want to talk to 419 foreach leftover $current { 420 itcl::delete object $peers($leftover) 421 unset peers($leftover) 422 puts " cleaned up $leftover (was on current list)" 423 } 424 425 # clean up after this test 426 worker_peers_cleanup 427 } 428 } 429 430 # ---------------------------------------------------------------------- 431 # COMMAND: worker_peers_cleanup 432 # 433 # Called after worker_peers_update has finished its business to 434 # clean up all of the connections that were open for testing. 435 # ---------------------------------------------------------------------- 436 proc worker_peers_cleanup {} { 529 after 10000 {peer-network goto finalize} 530 } 531 532 # sit here when we need to rebuild the peer-to-peer network 533 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 534 peer-network state finalize -onenter { 535 after cancel {peer-network goto finalize} 536 537 # everything is finalized now, so go to idle 538 after idle {peer-network goto idle} 539 } 540 541 # when moving to the finalize state, decide on the network of peers 542 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 543 peer-network transition measure->finalize -onchange { 437 544 global peers 545 546 set maxpeers [p2p::options get max_peer_connections] 547 548 # build a list: {peer latency} {peer latency} ... 549 set plist "" 550 foreach obj $peers(testing) { 551 if {[info exists peers(ping-$obj-latency)]} { 552 lappend plist [list $obj $peers(ping-$obj-latency)] 553 } 554 } 555 556 # sort the list and extract the top peers 557 set plist [lsort -increasing -index 1 $plist] 558 set plist [lrange $plist 0 [expr {$maxpeers-1}]] 559 560 set currentpeers [array names peers current-*] 561 562 # transfer the top peers to the "current" list 563 set all "" 564 foreach rec $plist { 565 set peer [lindex $rec 0] 566 set addr [$peer address] 567 set peers(current-$addr) $peer 568 lappend all $addr 569 570 # if it is already on the "current" list, then keep it 571 set i [lsearch $currentpeers current-$addr] 572 if {$i >= 0} { 573 set currentpeers [lreplace $currentpeers $i $i] 574 } 575 set i [lsearch $peers(testing) $peer] 576 if {$i >= 0} { 577 set peers(testing) [lreplace $peers(testing) $i $i] 578 } 579 } 580 log system "connected to peers: $all" 581 582 # get rid of old peers that we no longer want to talk to 583 foreach leftover $currentpeers { 584 itcl::delete object $peers($leftover) 585 unset peers($leftover) 586 } 587 588 # clean up after this test 438 589 foreach obj $peers(testing) { 439 590 catch {itcl::delete object $obj} 440 puts " cleaned up $obj (was on testing list)"441 591 } 442 592 set peers(testing) "" … … 446 596 } 447 597 set peers(responses) 0 448 449 after cancel worker_peers_finalize -force 450 } 451 452 # ---------------------------------------------------------------------- 453 # COMMAND: worker_peers_protocol 454 # 455 # Invoked whenever a peer sends their protocol message to this 456 # worker. Sends the same protocol name back, so the other worker 457 # understands what protocol we're speaking. 458 # ---------------------------------------------------------------------- 459 proc worker_peers_protocol {cid protocol} { 460 if {"DEFAULT" != $protocol} { 461 puts $cid [list protocol $protocol] 462 } 463 } 464 465 # ====================================================================== 466 # Connect to one of the authorities to get a list of peers. Then, 467 # sit in the event loop and process events. 468 # ====================================================================== 469 after idle worker_register 598 } 599 600 # ---------------------------------------------------------------------- 601 # JOB SOLICITATION 602 # 603 # Each worker can receive a "solicit" request, asking for information 604 # about performance and price of peers available to work. Each worker 605 # sends the request on to its peers, then gathers the information 606 # and sends it back to the client or to the peer requesting the 607 # information. There can be multiple requests going on at once, 608 # and each may have different job types and return different info, 609 # so the class below helps to watch over each request until it has 610 # completed. 611 # ---------------------------------------------------------------------- 612 itcl::class Solicitation { 613 public variable connection "" 614 public variable job "" 615 public variable path "" 616 public variable avoid "" 617 public variable token "" 618 619 private variable _serial "" 620 private variable _response "" 621 private variable _waitfor 0 622 private variable _timeout "" 623 624 constructor {args} { 625 eval configure $args 626 627 global myaddress 628 lappend path $myaddress 629 lappend avoid $myaddress 630 set _serial [incr counter] 631 set all($_serial) $this 632 633 set delay "idle" ;# finalize after waiting for responses 634 set ttl [p2p::options get peer_time_to_live] 635 if {[llength $path] < $ttl} { 636 set mesg [list solicit -job $job -path $path -avoid "$avoid @RECIPIENTS" -token $_serial] 637 set _waitfor [broadcast_to_peers $mesg $avoid] 638 log debug "WAIT FOR: $_waitfor" 639 640 if {$_waitfor > 0} { 641 # add a delay proportional to ttl + time for wonks measurement 642 set delay [expr {($ttl-[llength $path]-1)*1000 + 3000}] 643 } 644 } 645 log debug "TIMEOUT: $delay" 646 set _timeout [after $delay [itcl::code $this finalize]] 647 } 648 destructor { 649 after cancel $_timeout 650 catch {unset all($_serial)} 651 } 652 653 # this adds the info from each proffer to this solicitation 654 method response {details} { 655 set addr [lindex $details 0] 656 append _response $details "\n" 657 if {[incr _waitfor -1] <= 0} { 658 log debug "ALL RESPONSES" 659 finalize 660 } 661 } 662 663 # called to finalize when all peers have responded back 664 method finalize {} { 665 global myaddress 666 667 # filter out duplicate info from clients 668 set block "" 669 foreach line [split $_response \n] { 670 set addr [lindex $line 0] 671 if {"" != $addr && ![info exists response($addr)]} { 672 append block $line "\n" 673 set response($addr) 1 674 } 675 } 676 # add details about this client to the message 677 append block "$myaddress -job $job -cost 1 -wonks [p2p::wonks::current]" 678 log debug "FINALIZE {$block}" 679 680 # send the composite results back to the caller 681 set cmd [list proffer $token $block] 682 if {[catch {puts $connection $cmd} err]} { 683 log error "ERROR while sending back proffer: $err" 684 } 685 itcl::delete object $this 686 } 687 688 proc proffer {serial details} { 689 if {[info exists all($serial)]} { 690 $all($serial) response $details 691 } 692 } 693 694 common counter 0 ;# generates serial nums for objects 695 common all ;# maps serial num => solicitation object 696 } 697 698 # ---------------------------------------------------------------------- 470 699 log system "starting..." 700 701 after idle { 702 authority-connection goto connected 703 } 704 471 705 vwait main-loop
Note: See TracChangeset
for help on using the changeset viewer.