Changeset 1273 for trunk/p2p/worker.tcl
- Timestamp:
- Feb 5, 2009 6:17:23 AM (15 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/p2p/worker.tcl
r1257 r1273 21 21 22 22 # handle log file for this worker 23 log channel error on 24 log channel system on 23 25 log channel debug on 24 log channel system on 25 26 # set up a server at the first open port above 9101 27 set server [Server ::#auto 9101? -servername worker \ 28 -onprotocol worker_peers_protocol] 29 30 # ====================================================================== 31 # OPTIONS 32 # These options get the worker started, but new option settings 33 # some from the authority after this worker first connects. 34 # ====================================================================== 35 set authority "" ;# file handle for authority connection 26 27 proc ::bgerror {err} { log error "ERROR: $err $::errorInfo" } 28 29 36 30 set myaddress "?:?" ;# address/port for this worker 37 31 38 32 # set of connections for authority servers 39 set options(authority_hosts)127.0.0.1:900133 p2p::options register authority_hosts 127.0.0.1:9001 40 34 41 35 # register with the central authority at this frequency 42 set options(time_between_registers) 10000 36 p2p::options register time_between_authority_checks 60000 37 38 # rebuild the peer-to-peer network at this frequency 39 p2p::options register time_between_network_rebuilds 600000 43 40 44 41 # this worker should try to connect with this many other peers 45 set options(max_peer_connections) 4 46 47 # number of seconds between each check of system load 48 set options(time_between_load_checks) 60000 49 50 # ====================================================================== 51 # PEERS 52 # ====================================================================== 53 # eventually set to the list of all peers in the network 54 set peers(all) "" 55 56 # list of peers that we're testing connections with 57 set peers(testing) "" 58 59 # ====================================================================== 60 # SYSTEM LOAD 61 # ====================================================================== 62 # Check system load every so often. When the load changes 63 # significantly, execute a "perftest" run to compute the current 64 # number of wonks available to this worker. 65 66 set sysload(lastLoad) -1 67 set sysload(wonks) -1 68 set sysload(measured) -1 69 70 after idle worker_load_check 71 72 # ====================================================================== 73 # PROTOCOL: hubzero:peer/1 74 # This protocol gets used when another worker connects to this one. 75 # ====================================================================== 76 $server protocol hubzero:peer/1 77 78 $server define hubzero:peer/1 exception {message} { 79 log system "ERROR: $message" 80 } 81 82 # ---------------------------------------------------------------------- 83 # DIRECTIVE: identity 84 # Used for debugging, so that each client can identify itself by 85 # name to this worker. 86 # ---------------------------------------------------------------------- 87 $server define hubzero:peer/1 identity {name} { 88 variable cid 89 variable handler 90 $handler connectionName $cid $name 91 return "" 92 } 93 94 # ---------------------------------------------------------------------- 95 # DIRECTIVE: ping 96 # ---------------------------------------------------------------------- 97 $server define hubzero:peer/1 ping {} { 98 return "pong" 99 } 100 101 # ====================================================================== 102 # PROTOCOL: hubzero:worker/1 103 # The authority_connect procedure connects this worker to an authority 104 # and begins the authority/worker protocol. The directives below 105 # handle the incoming (authority->worker) traffic. 106 # ====================================================================== 107 proc authority_connect {} { 108 global authority options 109 log system "connecting to authorities..." 110 111 authority_disconnect 112 113 # scan through list of authorities and try to connect 114 foreach addr [randomize $options(authority_hosts)] { 115 if {[catch {Client ::#auto $addr} result] == 0} { 116 set authority $result 117 break 118 } 119 } 120 121 if {"" == $authority} { 122 error "can't connect to any known authorities" 123 } 124 125 # add handlers to process the incoming commands... 126 $authority protocol hubzero:worker/1 127 $authority define hubzero:worker/1 exception {args} { 128 log system "error from authority: $args" 129 } 130 131 # ------------------------------------------------------------------ 132 # DIRECTIVE: options <key1> <value1> <key2> <value2> ... 42 p2p::options register max_peer_connections 4 43 44 # workers propagate messages until time-to-live reaches 0 45 p2p::options register peer_time_to_live 4 46 47 # ====================================================================== 48 # PROTOCOL: hubzero:worker<-authority/1 49 # 50 # The worker initiates communication with the authority, and the 51 # authority responds by sending these messages. 52 # ====================================================================== 53 p2p::protocol::register hubzero:worker<-authority/1 { 54 55 # ------------------------------------------------------------------ 56 # INCOMING: options <key1> <value1> <key2> <value2> ... 133 57 # These option settings coming from the authority override the 134 # option settings built into the client at the top of this 135 # script. The authority probably has a more up-to-date list 136 # of other authorities and better policies for living within 137 # the network. 138 # ------------------------------------------------------------------ 139 $authority define hubzero:worker/1 options {args} { 140 global options server myaddress 58 # option settings built into the client. The authority probably 59 # has a more up-to-date list of other authorities and better 60 # policies for living within the network. 61 # ------------------------------------------------------------------ 62 define options {args} { 63 global server myaddress 141 64 142 65 array set options $args 66 foreach key [array names options] { 67 catch {p2p::options set $key $options($key)} 68 } 143 69 144 70 if {[info exists options(ip)]} { … … 150 76 151 77 # ------------------------------------------------------------------ 152 # DIRECTIVE: peers <listOfAddresses>78 # INCOMING: peers <listOfAddresses> 153 79 # This message comes in after this worker has sent the "peers" 154 80 # message to request the current list of peers. The … … 156 82 # worker should contact to enter the p2p network. 157 83 # ------------------------------------------------------------------ 158 $authority define hubzero:worker/1peers {plist} {84 define peers {plist} { 159 85 global peers 160 86 set peers(all) $plist 161 after idle worker_peers_update87 after idle {peer-network goto measure} 162 88 163 89 # now that we've gotten the peers, we're done with the authority 164 authority_disconnect 165 } 166 167 # ------------------------------------------------------------------ 168 # DIRECTIVE: identity 90 authority-connection goto idle 91 return "" 92 } 93 94 # ------------------------------------------------------------------ 95 # INCOMING: identity 169 96 # Used for debugging, so that each client can identify itself by 170 97 # name to this worker. 171 98 # ------------------------------------------------------------------ 172 $authority define hubzero:worker/1identity {name} {99 define identity {name} { 173 100 variable cid 174 101 variable handler … … 176 103 return "" 177 104 } 178 179 $authority send "protocol hubzero:worker/1" 180 return $authority 181 } 182 183 proc authority_disconnect {} { 184 global authority 185 if {"" != $authority} { 186 catch {itcl::delete object $authority} 187 set authority "" 188 } 189 } 190 191 # ====================================================================== 192 # USEFUL ROUTINES 193 # ====================================================================== 194 # ---------------------------------------------------------------------- 195 # COMMAND: worker_register 196 # 197 # Invoked when this worker first starts up and periodically thereafter 198 # to register the worker with the central authority, and to request 199 # a list of peers that this peer can talk to. 200 # ---------------------------------------------------------------------- 201 proc worker_register {} { 202 global options server 105 } 106 107 # ====================================================================== 108 # PROTOCOL: hubzero:worker<-foreman/1 109 # 110 # The foreman initiates communication with a worker, and sends the 111 # various messages supported below. 112 # ====================================================================== 113 p2p::protocol::register hubzero:worker<-foreman/1 { 114 115 # ------------------------------------------------------------------ 116 # INCOMING: solicit 117 # Foremen send this message to solicit bids for a simulation job. 118 # ------------------------------------------------------------------ 119 define solicit {args} { 120 log debug "SOLICIT from foreman: $args" 121 variable cid 122 log debug "solicitation request from foreman: $args" 123 eval Solicitation ::#auto $args -connection $cid 124 return "" 125 } 126 } 127 128 # ====================================================================== 129 # PROTOCOL: hubzero:workers<-workerc/1 130 # 131 # Workers initiate connections with other workers as peers. The 132 # following messages are sent by worker clients to the worker server. 133 # ====================================================================== 134 p2p::protocol::register hubzero:workers<-workerc/1 { 135 # ------------------------------------------------------------------ 136 # INCOMING: identity 137 # Used for debugging, so that each client can identify itself by 138 # name to this worker. 139 # ------------------------------------------------------------------ 140 define identity {name} { 141 variable cid 142 variable handler 143 $handler connectionName $cid $name 144 return "" 145 } 146 147 # ------------------------------------------------------------------ 148 # INCOMING: ping 149 # If another worker sends "ping", they are trying to measure the 150 # speed of the connection. Send back a "pong" message. If this 151 # worker is close by, the other worker will stay connected to it. 152 # ------------------------------------------------------------------ 153 define ping {} { 154 return "pong" 155 } 156 157 # ------------------------------------------------------------------ 158 # INCOMING: solicit -job info -path hosts -token xxx 159 # Workers send this message on to their peers to solicit bids 160 # for a simulation job. 161 # ------------------------------------------------------------------ 162 define solicit {args} { 163 log debug "SOLICIT from peer: $args" 164 variable cid 165 log debug "solicitation request from peer: $args" 166 eval Solicitation ::#auto $args -connection $cid 167 return "" 168 } 169 170 # ------------------------------------------------------------------ 171 # INCOMING: proffer <token> <details> 172 # Workers send this message back after a "solicit" request with 173 # details about what they can offer in terms of CPU power. When 174 # a worker has received all replies from its peers, it sends back 175 # its own proffer message to the client that started the 176 # solicitation. 177 # ------------------------------------------------------------------ 178 define proffer {token details} { 179 log debug "PROFFER from peer: $token $details" 180 Solicitation::proffer $token $details 181 return "" 182 } 183 } 184 185 # ====================================================================== 186 # PROTOCOL: hubzero:workerc<-workers/1 187 # 188 # The following messages are received by a worker client in response 189 # to the requests that they send to a worker server. 190 # ====================================================================== 191 p2p::protocol::register hubzero:workerc<-workers/1 { 192 # ------------------------------------------------------------------ 193 # INCOMING: identity 194 # Used for debugging, so that each client can identify itself by 195 # name to this worker. 196 # ------------------------------------------------------------------ 197 define identity {name} { 198 variable cid 199 variable handler 200 $handler connectionName $cid $name 201 return "" 202 } 203 204 # ------------------------------------------------------------------ 205 # INCOMING: pong 206 # When forming the peer-to-peer network, workers send ping/pong 207 # messages to one another to measure the latency. 208 # ------------------------------------------------------------------ 209 define pong {} { 210 global peers 211 variable handler 212 set now [clock clicks -milliseconds] 213 set delay [expr {$now - $peers(ping-$handler-start)}] 214 set peers(ping-$handler-latency) $delay 215 if {[incr peers(responses)] >= [llength $peers(testing)]} { 216 after cancel {peer-network goto finalize} 217 after idle {peer-network goto finalize} 218 } 219 return "" 220 } 221 222 # ------------------------------------------------------------------ 223 # INCOMING: solicit -job info -path hosts -token xxx 224 # Workers send this message on to their peers to solicit bids 225 # for a simulation job. 226 # ------------------------------------------------------------------ 227 define solicit {args} { 228 log debug "SOLICIT from peer: $args" 229 variable cid 230 log debug "solicitation request from peer: $args" 231 eval Solicitation ::#auto $args -connection $cid 232 return "" 233 } 234 235 # ------------------------------------------------------------------ 236 # INCOMING: proffer <token> <details> 237 # Workers send this message back after a "solicit" request with 238 # details about what they can offer in terms of CPU power. When 239 # a worker has received all replies from its peers, it sends back 240 # its own proffer message to the client that started the 241 # solicitation. 242 # ------------------------------------------------------------------ 243 define proffer {token details} { 244 log debug "PROFFER from peer: $token $details" 245 Solicitation::proffer $token $details 246 return "" 247 } 248 } 249 250 # ---------------------------------------------------------------------- 251 # COMMAND: broadcast_to_peers <message> ?<avoidList>? 252 # 253 # Used to broadcast a message out to all peers. The <message> must 254 # be one of the commands defined in the worker protocol. If there 255 # are no peer connections yet, this command does nothing. If any 256 # peer appears on the <avoidList>, it is skipped. Returns the 257 # number of messages sent out, so this peer knows how many replies 258 # to wait for. 259 # ---------------------------------------------------------------------- 260 proc broadcast_to_peers {message {avoidList ""}} { 261 global server peers 262 263 # 264 # Build a list of all peer addresses, so we know who we're 265 # going to send this message out to. 266 # 267 set recipients "" 268 foreach key [array names peers current-*] { 269 set addr [$peers($key) address] 270 if {[lsearch $avoidList $addr] < 0} { 271 lappend recipients $addr 272 } 273 } 274 foreach cid [$server connections hubzero:workers<-workerc/1] { 275 set addr [lindex [$server connectionName $cid] 0] ;# x.x.x.x (sockN) 276 if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} { 277 lappend recipients $addr 278 } 279 } 280 281 # 282 # If the message has any @RECIPIENTS fields, replace them 283 # with the list of recipients 284 # 285 regsub -all @RECIPIENTS $message $recipients message 286 287 # 288 # Send the message out to all peers. Keep a count and double-check 289 # it against the list of recipients generated above. 290 # 291 set nmesgs 0 292 293 # send off to other workers that this one has connected to 294 foreach key [array names peers current-*] { 295 set addr [$peers($key) address] 296 if {[lsearch $avoidList $addr] < 0} { 297 if {[catch {$peers($key) send $message} err] == 0} { 298 incr nmesgs 299 } else { 300 log error "ERROR: broadcast failed to [$peers($key) address]: $result\n (message was \"$message\")" 301 } 302 } 303 } 304 305 # send off to other workers that connected to this one 306 foreach cid [$server connections hubzero:workers<-workerc/1] { 307 set addr [lindex [$server connectionName $cid] 0] ;# x.x.x.x (sockN) 308 if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} { 309 if {[catch {puts $cid $message} result] == 0} { 310 incr nmesgs 311 } else { 312 log error "ERROR: broadcast failed for $cid: $result" 313 log error " (message was $message)" 314 } 315 } 316 } 317 318 # did we send the right number of messages? 319 if {[llength $recipients] != $nmesgs} { 320 log error "ERROR: sent only $nmesgs messages to peers {$recipients}" 321 } 322 323 return $nmesgs 324 } 325 326 # ---------------------------------------------------------------------- 327 # COMMAND: worker_got_protocol 328 # 329 # Invoked whenever a peer sends their protocol message to this 330 # worker. Sends the same protocol name back, so the other worker 331 # understands what protocol we're speaking. 332 # ---------------------------------------------------------------------- 333 proc worker_got_protocol {cid protocol} { 334 switch -glob -- $protocol { 335 *<-worker* { 336 puts $cid [list protocol hubzero:workerc<-workers/1] 337 } 338 *<-foreman* { 339 puts $cid [list protocol hubzero:foreman<-worker/1] 340 } 341 DEF* { 342 # do nothing 343 } 344 default { 345 error "don't recognize protocol \"$protocol\"" 346 } 347 } 348 } 349 350 # ---------------------------------------------------------------------- 351 # COMMAND: worker_got_dropped <address> 352 # 353 # Invoked whenever an inbound connection to this worker drops. 354 # At some point we should try updating our connections to other 355 # peers, to replace this missing connection. 356 # ---------------------------------------------------------------------- 357 proc worker_got_dropped {addr} { 358 global peers 359 if {[info exists peers(current-$addr)]} { 360 unset peers(current-$addr) 361 after cancel {peer-network goto measure} 362 after 5000 {peer-network goto measure} 363 log debug "peer dropped: $addr" 364 } 365 } 366 367 # ====================================================================== 368 # Connect to one of the authorities to get a list of peers. Then, 369 # sit in the event loop and process events. 370 # ====================================================================== 371 p2p::wonks::init 372 373 set server [p2p::server -port 9101? \ 374 -protocols {hubzero:workers<-workerc hubzero:worker<-foreman} \ 375 -servername worker \ 376 -onprotocol worker_got_protocol \ 377 -ondisconnect worker_got_dropped] 378 379 # ---------------------------------------------------------------------- 380 # AUTHORITY CONNECTION 381 # 382 # This is the state machine representing the connection to the 383 # authority server. We start in the "idle" state. Whenever we 384 # try to move to "connected", we'll open a connection to an authority 385 # and request updated information on workers. If that connection 386 # fails, we'll get kicked back to "idle" and we'll try again later. 387 # ---------------------------------------------------------------------- 388 StateMachine authority-connection 389 authority-connection statedata cnx "" 390 391 # sit here whenever we don't have a connection 392 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 393 authority-connection state idle -onenter { 394 # have an open connection? then close it 395 if {"" != [statedata cnx]} { 396 catch {itcl::delete object [statedata cnx]} 397 statedata cnx "" 398 } 399 # try to connect again later 400 set delay [p2p::options get time_between_authority_checks] 401 after $delay {authority-connection goto connected} 402 } -onleave { 403 after cancel {authority-connection goto connected} 404 } 405 406 # sit here after we're connected and waiting for a response 407 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 408 authority-connection state connected 409 410 # when moving to the connected state, make a connection 411 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 412 authority-connection transition idle->connected -onchange { 413 global server 203 414 204 415 # connect to the authority and request a list of peers 205 if {[catch { 206 set client [authority_connect] 207 $client send "listening [$server port]" 208 $client send "peers" 209 } result]} { 210 log system "ERROR: $result" 211 } 212 213 # register again at regular intervals in case the authority 214 # gets restarted in between. 215 after $options(time_between_registers) worker_register 216 } 217 218 # ---------------------------------------------------------------------- 219 # COMMAND: worker_load_check 220 # 221 # Invoked when this worker first starts up and periodically thereafter 222 # to compute the system load available to the worker. If the system 223 # load has changed significantly, then the "perftest" program is 224 # executed to get a measure of performance available. This program 225 # returns a number of "wonks" which we report to peers as our available 226 # performance. 227 # ---------------------------------------------------------------------- 228 proc worker_load_check {} { 229 global options sysload dir 230 231 # see if the load has changed significantly 232 set changed 0 233 if {$sysload(lastLoad) < 0} { 234 set changed 1 416 statedata cnx "" 417 foreach addr [randomize [p2p::options get authority_hosts]] { 418 if {[catch {p2p::client -address $addr \ 419 -sendprotocol hubzero:authority<-worker/1 \ 420 -receiveprotocol hubzero:worker<-authority/1} result] == 0} { 421 statedata cnx $result 422 break 423 } 424 } 425 426 if {"" != [statedata cnx]} { 427 [statedata cnx] send "listening [$server port]" 428 [statedata cnx] send "peers" 235 429 } else { 236 set load [Rappture::sysinfo load5] 237 if {$load < 0.9*$sysload(lastLoad) || $load > 1.1*$sysload(lastLoad)} { 238 set changed 1 239 } 240 } 241 242 if {$changed} { 243 set sysload(lastLoad) [Rappture::sysinfo load5] 244 puts "LOAD CHANGED: $sysload(lastLoad) [Rappture::sysinfo freeram freeswap]" 245 set sysload(measured) [clock seconds] 246 247 # Run the program, but don't use exec, since it blocks. 248 # Instead, run it in the background and harvest its output later. 249 set sysload(test) [open "| [file join $dir perftest]" r] 250 fileevent $sysload(test) readable worker_load_results 251 } 252 253 # monitor the system load at regular intervals 254 after $options(time_between_load_checks) worker_load_check 255 } 256 257 # ---------------------------------------------------------------------- 258 # COMMAND: worker_load_results 259 # 260 # Invoked automatically when the "perftest" run finishes. Reads the 261 # number of wonks reported on standard output and reports them to 262 # other peers. 263 # ---------------------------------------------------------------------- 264 proc worker_load_results {} { 265 global sysload 266 267 if {[catch {read $sysload(test)} msg] == 0} { 268 if {[regexp {[0-9]+} $msg match]} { 269 puts "WONKS: $match" 270 set sysload(wonks) $match 271 } else { 272 log system "ERROR: performance test failed: $msg" 273 } 274 } 275 catch {close $sysload(test)} 276 set sysload(test) "" 277 } 278 279 # ---------------------------------------------------------------------- 280 # COMMAND: worker_peers_update 281 # 282 # Invoked when this worker has received a list of peers, and from 283 # time to time thereafter, to establish connections with other peers 284 # in the network. This worker picks a number of peers randomly from 285 # the list of all available peers, and then pings each of them. 286 # Timing results from the pings are stored away, and when all pings 287 # have completed, this worker picks the best few connections and 288 # keeps those. 289 # ---------------------------------------------------------------------- 290 proc worker_peers_update {} { 291 global options peers myaddress 292 worker_peers_cleanup 430 error "can't connect to any authority\nAUTHORITY LIST: [p2p::options get authority_hosts]" 431 } 432 } 433 434 # ---------------------------------------------------------------------- 435 # PEER-TO-PEER CONNECTIONS 436 # 437 # This is the state machine representing the network of worker peers. 438 # We start in the "idle" state. From time to time we move to the 439 # "measure" state and attempt to establish connections with a set 440 # peers. We then wait for ping/pong responses and move to "finalize". 441 # When we have all responses, we move to "finalize" and finalize all 442 # connections, and then move back to "idle". 443 # ---------------------------------------------------------------------- 444 StateMachine peer-network 445 peer-network statedata cnx "" 446 447 # sit here when the peer network is okay 448 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 449 peer-network state idle -onenter { 450 # try to connect again later 451 set delay [p2p::options get time_between_network_rebuilds] 452 after $delay {peer-network goto measure} 453 } -onleave { 454 after cancel {peer-network goto measure} 455 } 456 457 # sit here when we need to rebuild the peer-to-peer network 458 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 459 peer-network state measure 460 461 # when moving to the start state, make connections to a bunch of peers 462 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 463 peer-network transition idle->measure -onchange { 464 global server myaddress peers 465 466 # 467 # Get a list of workers already connected to this one 468 # with an inbound connection to the server. 469 # 470 foreach cid [$server connections hubzero:workers<-workerc/1] { 471 set addr [lindex [$server connectionName $cid] 0] 472 set inbound($addr) $cid 473 } 293 474 294 475 # 295 476 # Pick a random group of peers and try to connect to them. 296 # Start with the existing peers, and then add a random 297 # bunch of others. 298 # 477 # Start with the existing peers to see if their connection 478 # is still favorable, and then add a random bunch of others. 479 # 480 set peers(testing) "" 481 set peers(responses) 0 299 482 foreach key [array names peers current-*] { 300 483 set peer $peers($key) … … 307 490 # final number we want to talk to. 308 491 # 309 set ntest [expr {10 * $options(max_peer_connections)}] 310 foreach addr [randomize $peers(all) $ntest] { 311 if {$addr == $myaddress} { 492 set maxpeers [p2p::options get max_peer_connections] 493 foreach addr [randomize $peers(all) [expr {10*$maxpeers}]] { 494 # 495 # Avoid connecting to ourself, or to peers that we're 496 # already connected to (either as inbound connections 497 # to the server or as outbound connections to others). 498 # 499 if {$addr == $myaddress 500 || [info exists peers(current-$addr)] 501 || [info exists inbound($addr)]} { 312 502 continue 313 503 } 314 if {![info exists peers(current-$addr)] 315 && [catch {Client ::#auto $addr} peer] == 0} { 316 # open a new connection to this address 317 lappend peers(testing) $peer 318 $peer protocol hubzero:peer/1 319 320 $peer define hubzero:peer/1 exception {message} { 321 log system "ERROR: $message" 322 } 323 324 $peer define hubzero:peer/1 identity {name} { 325 variable cid 326 variable handler 327 $handler connectionName $cid $name 328 return "" 329 } 330 331 # when we get a "pong" back, store the latency 332 $peer define hubzero:peer/1 pong {} { 333 global peers 334 variable handler 335 puts " pong from $handler" 336 set now [clock clicks -milliseconds] 337 set delay [expr {$now - $peers(ping-$handler-start)}] 338 set peers(ping-$handler-latency) $delay 339 incr peers(responses) 340 worker_peers_finalize 341 return "" 342 } 343 344 # start the ping/pong session with the peer 345 $peer send "protocol hubzero:peer/1" 346 347 # send tell this peer our name (for debugging) 348 if {[log channel debug]} { 349 $peer send "identity $myaddress" 350 } 504 505 if {[catch {p2p::client -address $addr \ 506 -sendprotocol hubzero:workers<-workerc/1 \ 507 -receiveprotocol hubzero:workerc<-workers/1} cnx]} { 508 continue 509 } 510 $cnx send "identity $myaddress" 511 lappend peers(testing) $cnx 512 513 # have enough connections to test? 514 if {[llength $peers(testing)] >= 2*$maxpeers} { 515 break 351 516 } 352 517 } … … 355 520 # Now, loop through all peers and send a "ping" message. 356 521 # 357 foreach peer $peers(testing) { 358 puts "pinging $peer = [$peer address]..." 522 foreach cnx $peers(testing) { 359 523 # mark the time and send a ping to this peer 360 set peers(ping-$ peer-start) [clock clicks -milliseconds]361 $ peersend "ping"524 set peers(ping-$cnx-start) [clock clicks -milliseconds] 525 $cnx send "ping" 362 526 } 363 527 364 528 # if this test takes too long, just give up 365 after 10000 worker_peers_finalize -force 366 } 367 368 # ---------------------------------------------------------------------- 369 # COMMAND: worker_peers_finalize ?-force? 370 # 371 # Called after worker_peers_update has finished its business to 372 # clean up all of the connections that were open for testing. 373 # ---------------------------------------------------------------------- 374 proc worker_peers_finalize {{option -check}} { 375 global peers options 376 if {$option == "-force" || $peers(responses) == [llength $peers(testing)]} { 377 # build a list: {peer latency} {peer latency} ... 378 set plist "" 379 foreach obj $peers(testing) { 380 if {[info exists peers(ping-$obj-latency)]} { 381 lappend plist [list $obj $peers(ping-$obj-latency)] 382 } 383 } 384 puts "-------------------\nFINALIZING $::myaddress" 385 puts "PINGS: $plist" 386 387 # sort the list and extract the top peers 388 set plist [lsort -increasing -index 1 $plist] 389 set plist [lrange $plist 0 [expr {$options(max_peer_connections)-1}]] 390 puts "TOP: $plist" 391 392 set current [array names peers current-*] 393 puts " current: $current" 394 395 # transfer the top peers to the "current" list 396 set all "" 397 foreach rec $plist { 398 set peer [lindex $rec 0] 399 set addr [$peer address] 400 set peers(current-$addr) $peer 401 lappend all $addr 402 403 # if it is already on the "current" list, then keep it 404 set i [lsearch $current current-$addr] 405 if {$i >= 0} { 406 set current [lreplace $current $i $i] 407 } 408 set i [lsearch $peers(testing) $peer] 409 if {$i >= 0} { 410 set peers(testing) [lreplace $peers(testing) $i $i] 411 } 412 } 413 log system "connected to peers: $all" 414 puts " new current: [array names peers current-*]" 415 puts " final: $all" 416 puts " leftover: $current $peers(testing)" 417 418 # get rid of old peers that we no longer want to talk to 419 foreach leftover $current { 420 itcl::delete object $peers($leftover) 421 unset peers($leftover) 422 puts " cleaned up $leftover (was on current list)" 423 } 424 425 # clean up after this test 426 worker_peers_cleanup 427 } 428 } 429 430 # ---------------------------------------------------------------------- 431 # COMMAND: worker_peers_cleanup 432 # 433 # Called after worker_peers_update has finished its business to 434 # clean up all of the connections that were open for testing. 435 # ---------------------------------------------------------------------- 436 proc worker_peers_cleanup {} { 529 after 10000 {peer-network goto finalize} 530 } 531 532 # sit here when we need to rebuild the peer-to-peer network 533 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 534 peer-network state finalize -onenter { 535 after cancel {peer-network goto finalize} 536 537 # everything is finalized now, so go to idle 538 after idle {peer-network goto idle} 539 } 540 541 # when moving to the finalize state, decide on the network of peers 542 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 543 peer-network transition measure->finalize -onchange { 437 544 global peers 545 546 set maxpeers [p2p::options get max_peer_connections] 547 548 # build a list: {peer latency} {peer latency} ... 549 set plist "" 550 foreach obj $peers(testing) { 551 if {[info exists peers(ping-$obj-latency)]} { 552 lappend plist [list $obj $peers(ping-$obj-latency)] 553 } 554 } 555 556 # sort the list and extract the top peers 557 set plist [lsort -increasing -index 1 $plist] 558 set plist [lrange $plist 0 [expr {$maxpeers-1}]] 559 560 set currentpeers [array names peers current-*] 561 562 # transfer the top peers to the "current" list 563 set all "" 564 foreach rec $plist { 565 set peer [lindex $rec 0] 566 set addr [$peer address] 567 set peers(current-$addr) $peer 568 lappend all $addr 569 570 # if it is already on the "current" list, then keep it 571 set i [lsearch $currentpeers current-$addr] 572 if {$i >= 0} { 573 set currentpeers [lreplace $currentpeers $i $i] 574 } 575 set i [lsearch $peers(testing) $peer] 576 if {$i >= 0} { 577 set peers(testing) [lreplace $peers(testing) $i $i] 578 } 579 } 580 log system "connected to peers: $all" 581 582 # get rid of old peers that we no longer want to talk to 583 foreach leftover $currentpeers { 584 itcl::delete object $peers($leftover) 585 unset peers($leftover) 586 } 587 588 # clean up after this test 438 589 foreach obj $peers(testing) { 439 590 catch {itcl::delete object $obj} 440 puts " cleaned up $obj (was on testing list)"441 591 } 442 592 set peers(testing) "" … … 446 596 } 447 597 set peers(responses) 0 448 449 after cancel worker_peers_finalize -force 450 } 451 452 # ---------------------------------------------------------------------- 453 # COMMAND: worker_peers_protocol 454 # 455 # Invoked whenever a peer sends their protocol message to this 456 # worker. Sends the same protocol name back, so the other worker 457 # understands what protocol we're speaking. 458 # ---------------------------------------------------------------------- 459 proc worker_peers_protocol {cid protocol} { 460 if {"DEFAULT" != $protocol} { 461 puts $cid [list protocol $protocol] 462 } 463 } 464 465 # ====================================================================== 466 # Connect to one of the authorities to get a list of peers. Then, 467 # sit in the event loop and process events. 468 # ====================================================================== 469 after idle worker_register 598 } 599 600 # ---------------------------------------------------------------------- 601 # JOB SOLICITATION 602 # 603 # Each worker can receive a "solicit" request, asking for information 604 # about performance and price of peers available to work. Each worker 605 # sends the request on to its peers, then gathers the information 606 # and sends it back to the client or to the peer requesting the 607 # information. There can be multiple requests going on at once, 608 # and each may have different job types and return different info, 609 # so the class below helps to watch over each request until it has 610 # completed. 611 # ---------------------------------------------------------------------- 612 itcl::class Solicitation { 613 public variable connection "" 614 public variable job "" 615 public variable path "" 616 public variable avoid "" 617 public variable token "" 618 619 private variable _serial "" 620 private variable _response "" 621 private variable _waitfor 0 622 private variable _timeout "" 623 624 constructor {args} { 625 eval configure $args 626 627 global myaddress 628 lappend path $myaddress 629 lappend avoid $myaddress 630 set _serial [incr counter] 631 set all($_serial) $this 632 633 set delay "idle" ;# finalize after waiting for responses 634 set ttl [p2p::options get peer_time_to_live] 635 if {[llength $path] < $ttl} { 636 set mesg [list solicit -job $job -path $path -avoid "$avoid @RECIPIENTS" -token $_serial] 637 set _waitfor [broadcast_to_peers $mesg $avoid] 638 log debug "WAIT FOR: $_waitfor" 639 640 if {$_waitfor > 0} { 641 # add a delay proportional to ttl + time for wonks measurement 642 set delay [expr {($ttl-[llength $path]-1)*1000 + 3000}] 643 } 644 } 645 log debug "TIMEOUT: $delay" 646 set _timeout [after $delay [itcl::code $this finalize]] 647 } 648 destructor { 649 after cancel $_timeout 650 catch {unset all($_serial)} 651 } 652 653 # this adds the info from each proffer to this solicitation 654 method response {details} { 655 set addr [lindex $details 0] 656 append _response $details "\n" 657 if {[incr _waitfor -1] <= 0} { 658 log debug "ALL RESPONSES" 659 finalize 660 } 661 } 662 663 # called to finalize when all peers have responded back 664 method finalize {} { 665 global myaddress 666 667 # filter out duplicate info from clients 668 set block "" 669 foreach line [split $_response \n] { 670 set addr [lindex $line 0] 671 if {"" != $addr && ![info exists response($addr)]} { 672 append block $line "\n" 673 set response($addr) 1 674 } 675 } 676 # add details about this client to the message 677 append block "$myaddress -job $job -cost 1 -wonks [p2p::wonks::current]" 678 log debug "FINALIZE {$block}" 679 680 # send the composite results back to the caller 681 set cmd [list proffer $token $block] 682 if {[catch {puts $connection $cmd} err]} { 683 log error "ERROR while sending back proffer: $err" 684 } 685 itcl::delete object $this 686 } 687 688 proc proffer {serial details} { 689 if {[info exists all($serial)]} { 690 $all($serial) response $details 691 } 692 } 693 694 common counter 0 ;# generates serial nums for objects 695 common all ;# maps serial num => solicitation object 696 } 697 698 # ---------------------------------------------------------------------- 470 699 log system "starting..." 700 701 after idle { 702 authority-connection goto connected 703 } 704 471 705 vwait main-loop
Note: See TracChangeset
for help on using the changeset viewer.