[1251] | 1 | # ---------------------------------------------------------------------- |
---|
| 2 | # P2P: worker node in P2P mesh bidding on and executing jobs |
---|
| 3 | # |
---|
| 4 | # This server is a typical worker node in the P2P mesh for HUBzero |
---|
| 5 | # job execution. It talks to other workers to distribute the load |
---|
| 6 | # of job requests. It bids on jobs based on its current resources |
---|
| 7 | # and executes jobs to earn points from the central authority. |
---|
| 8 | # ---------------------------------------------------------------------- |
---|
| 9 | # Michael McLennan (mmclennan@purdue.edu) |
---|
| 10 | # ====================================================================== |
---|
| 11 | # Copyright (c) 2008 Purdue Research Foundation |
---|
[1257] | 12 | # |
---|
| 13 | # See the file "license.terms" for information on usage and |
---|
| 14 | # redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES. |
---|
[1251] | 15 | # ====================================================================== |
---|
[1257] | 16 | package require Rappture |
---|
[1251] | 17 | |
---|
| 18 | # recognize other library files in this same directory |
---|
| 19 | set dir [file dirname [info script]] |
---|
| 20 | lappend auto_path $dir |
---|
| 21 | |
---|
| 22 | # handle log file for this worker |
---|
[1273] | 23 | log channel error on |
---|
| 24 | log channel system on |
---|
[1251] | 25 | log channel debug on |
---|
| 26 | |
---|
[1273] | 27 | proc ::bgerror {err} { log error "ERROR: $err $::errorInfo" } |
---|
[1251] | 28 | |
---|
[1273] | 29 | |
---|
[1251] | 30 | set myaddress "?:?" ;# address/port for this worker |
---|
| 31 | |
---|
| 32 | # set of connections for authority servers |
---|
[1273] | 33 | p2p::options register authority_hosts 127.0.0.1:9001 |
---|
[1251] | 34 | |
---|
| 35 | # register with the central authority at this frequency |
---|
[1273] | 36 | p2p::options register time_between_authority_checks 60000 |
---|
[1251] | 37 | |
---|
[1273] | 38 | # rebuild the peer-to-peer network at this frequency |
---|
| 39 | p2p::options register time_between_network_rebuilds 600000 |
---|
| 40 | |
---|
[1251] | 41 | # this worker should try to connect with this many other peers |
---|
[1273] | 42 | p2p::options register max_peer_connections 4 |
---|
[1251] | 43 | |
---|
[1273] | 44 | # workers propagate messages until time-to-live reaches 0 |
---|
| 45 | p2p::options register peer_time_to_live 4 |
---|
[1257] | 46 | |
---|
[1251] | 47 | # ====================================================================== |
---|
[1273] | 48 | # PROTOCOL: hubzero:worker<-authority/1 |
---|
| 49 | # |
---|
| 50 | # The worker initiates communication with the authority, and the |
---|
| 51 | # authority responds by sending these messages. |
---|
[1251] | 52 | # ====================================================================== |
---|
[1273] | 53 | p2p::protocol::register hubzero:worker<-authority/1 { |
---|
[1251] | 54 | |
---|
| 55 | # ------------------------------------------------------------------ |
---|
[1273] | 56 | # INCOMING: options <key1> <value1> <key2> <value2> ... |
---|
[1251] | 57 | # These option settings coming from the authority override the |
---|
[1273] | 58 | # option settings built into the client. The authority probably |
---|
| 59 | # has a more up-to-date list of other authorities and better |
---|
| 60 | # policies for living within the network. |
---|
[1251] | 61 | # ------------------------------------------------------------------ |
---|
[1273] | 62 | define options {args} { |
---|
| 63 | global server myaddress |
---|
[1251] | 64 | |
---|
| 65 | array set options $args |
---|
[1273] | 66 | foreach key [array names options] { |
---|
| 67 | catch {p2p::options set $key $options($key)} |
---|
| 68 | } |
---|
[1251] | 69 | |
---|
| 70 | if {[info exists options(ip)]} { |
---|
| 71 | set myaddress $options(ip):[$server port] |
---|
| 72 | log debug "my address $myaddress" |
---|
| 73 | } |
---|
| 74 | return "" |
---|
| 75 | } |
---|
| 76 | |
---|
| 77 | # ------------------------------------------------------------------ |
---|
[1273] | 78 | # INCOMING: peers <listOfAddresses> |
---|
[1251] | 79 | # This message comes in after this worker has sent the "peers" |
---|
| 80 | # message to request the current list of peers. The |
---|
| 81 | # <listOfAddresses> is a list of host:port addresses that this |
---|
| 82 | # worker should contact to enter the p2p network. |
---|
| 83 | # ------------------------------------------------------------------ |
---|
[1273] | 84 | define peers {plist} { |
---|
[1251] | 85 | global peers |
---|
| 86 | set peers(all) $plist |
---|
[1273] | 87 | after idle {peer-network goto measure} |
---|
[1251] | 88 | |
---|
| 89 | # now that we've gotten the peers, we're done with the authority |
---|
[1273] | 90 | authority-connection goto idle |
---|
| 91 | return "" |
---|
[1251] | 92 | } |
---|
| 93 | |
---|
| 94 | # ------------------------------------------------------------------ |
---|
[1273] | 95 | # INCOMING: identity |
---|
[1251] | 96 | # Used for debugging, so that each client can identify itself by |
---|
| 97 | # name to this worker. |
---|
| 98 | # ------------------------------------------------------------------ |
---|
[1273] | 99 | define identity {name} { |
---|
[1251] | 100 | variable cid |
---|
| 101 | variable handler |
---|
| 102 | $handler connectionName $cid $name |
---|
| 103 | return "" |
---|
| 104 | } |
---|
[1273] | 105 | } |
---|
[1251] | 106 | |
---|
[1273] | 107 | # ====================================================================== |
---|
| 108 | # PROTOCOL: hubzero:worker<-foreman/1 |
---|
| 109 | # |
---|
| 110 | # The foreman initiates communication with a worker, and sends the |
---|
| 111 | # various messages supported below. |
---|
| 112 | # ====================================================================== |
---|
| 113 | p2p::protocol::register hubzero:worker<-foreman/1 { |
---|
| 114 | |
---|
| 115 | # ------------------------------------------------------------------ |
---|
| 116 | # INCOMING: solicit |
---|
| 117 | # Foremen send this message to solicit bids for a simulation job. |
---|
| 118 | # ------------------------------------------------------------------ |
---|
| 119 | define solicit {args} { |
---|
| 120 | log debug "SOLICIT from foreman: $args" |
---|
| 121 | variable cid |
---|
| 122 | log debug "solicitation request from foreman: $args" |
---|
| 123 | eval Solicitation ::#auto $args -connection $cid |
---|
| 124 | return "" |
---|
| 125 | } |
---|
[1251] | 126 | } |
---|
| 127 | |
---|
[1273] | 128 | # ====================================================================== |
---|
| 129 | # PROTOCOL: hubzero:workers<-workerc/1 |
---|
| 130 | # |
---|
| 131 | # Workers initiate connections with other workers as peers. The |
---|
| 132 | # following messages are sent by worker clients to the worker server. |
---|
| 133 | # ====================================================================== |
---|
| 134 | p2p::protocol::register hubzero:workers<-workerc/1 { |
---|
| 135 | # ------------------------------------------------------------------ |
---|
| 136 | # INCOMING: identity |
---|
| 137 | # Used for debugging, so that each client can identify itself by |
---|
| 138 | # name to this worker. |
---|
| 139 | # ------------------------------------------------------------------ |
---|
| 140 | define identity {name} { |
---|
| 141 | variable cid |
---|
| 142 | variable handler |
---|
| 143 | $handler connectionName $cid $name |
---|
| 144 | return "" |
---|
[1251] | 145 | } |
---|
[1273] | 146 | |
---|
| 147 | # ------------------------------------------------------------------ |
---|
| 148 | # INCOMING: ping |
---|
| 149 | # If another worker sends "ping", they are trying to measure the |
---|
| 150 | # speed of the connection. Send back a "pong" message. If this |
---|
| 151 | # worker is close by, the other worker will stay connected to it. |
---|
| 152 | # ------------------------------------------------------------------ |
---|
| 153 | define ping {} { |
---|
| 154 | return "pong" |
---|
| 155 | } |
---|
| 156 | |
---|
| 157 | # ------------------------------------------------------------------ |
---|
| 158 | # INCOMING: solicit -job info -path hosts -token xxx |
---|
| 159 | # Workers send this message on to their peers to solicit bids |
---|
| 160 | # for a simulation job. |
---|
| 161 | # ------------------------------------------------------------------ |
---|
| 162 | define solicit {args} { |
---|
| 163 | log debug "SOLICIT from peer: $args" |
---|
| 164 | variable cid |
---|
| 165 | log debug "solicitation request from peer: $args" |
---|
| 166 | eval Solicitation ::#auto $args -connection $cid |
---|
| 167 | return "" |
---|
| 168 | } |
---|
| 169 | |
---|
| 170 | # ------------------------------------------------------------------ |
---|
| 171 | # INCOMING: proffer <token> <details> |
---|
| 172 | # Workers send this message back after a "solicit" request with |
---|
| 173 | # details about what they can offer in terms of CPU power. When |
---|
| 174 | # a worker has received all replies from its peers, it sends back |
---|
| 175 | # its own proffer message to the client that started the |
---|
| 176 | # solicitation. |
---|
| 177 | # ------------------------------------------------------------------ |
---|
| 178 | define proffer {token details} { |
---|
| 179 | log debug "PROFFER from peer: $token $details" |
---|
| 180 | Solicitation::proffer $token $details |
---|
| 181 | return "" |
---|
| 182 | } |
---|
[1251] | 183 | } |
---|
| 184 | |
---|
| 185 | # ====================================================================== |
---|
[1273] | 186 | # PROTOCOL: hubzero:workerc<-workers/1 |
---|
| 187 | # |
---|
| 188 | # The following messages are received by a worker client in response |
---|
| 189 | # to the requests that they send to a worker server. |
---|
[1251] | 190 | # ====================================================================== |
---|
[1273] | 191 | p2p::protocol::register hubzero:workerc<-workers/1 { |
---|
| 192 | # ------------------------------------------------------------------ |
---|
| 193 | # INCOMING: identity |
---|
| 194 | # Used for debugging, so that each client can identify itself by |
---|
| 195 | # name to this worker. |
---|
| 196 | # ------------------------------------------------------------------ |
---|
| 197 | define identity {name} { |
---|
| 198 | variable cid |
---|
| 199 | variable handler |
---|
| 200 | $handler connectionName $cid $name |
---|
| 201 | return "" |
---|
| 202 | } |
---|
[1251] | 203 | |
---|
[1273] | 204 | # ------------------------------------------------------------------ |
---|
| 205 | # INCOMING: pong |
---|
| 206 | # When forming the peer-to-peer network, workers send ping/pong |
---|
| 207 | # messages to one another to measure the latency. |
---|
| 208 | # ------------------------------------------------------------------ |
---|
| 209 | define pong {} { |
---|
| 210 | global peers |
---|
| 211 | variable handler |
---|
| 212 | set now [clock clicks -milliseconds] |
---|
| 213 | set delay [expr {$now - $peers(ping-$handler-start)}] |
---|
| 214 | set peers(ping-$handler-latency) $delay |
---|
| 215 | if {[incr peers(responses)] >= [llength $peers(testing)]} { |
---|
| 216 | after cancel {peer-network goto finalize} |
---|
| 217 | after idle {peer-network goto finalize} |
---|
| 218 | } |
---|
| 219 | return "" |
---|
[1251] | 220 | } |
---|
| 221 | |
---|
[1273] | 222 | # ------------------------------------------------------------------ |
---|
| 223 | # INCOMING: solicit -job info -path hosts -token xxx |
---|
| 224 | # Workers send this message on to their peers to solicit bids |
---|
| 225 | # for a simulation job. |
---|
| 226 | # ------------------------------------------------------------------ |
---|
| 227 | define solicit {args} { |
---|
| 228 | log debug "SOLICIT from peer: $args" |
---|
| 229 | variable cid |
---|
| 230 | log debug "solicitation request from peer: $args" |
---|
| 231 | eval Solicitation ::#auto $args -connection $cid |
---|
| 232 | return "" |
---|
| 233 | } |
---|
| 234 | |
---|
| 235 | # ------------------------------------------------------------------ |
---|
| 236 | # INCOMING: proffer <token> <details> |
---|
| 237 | # Workers send this message back after a "solicit" request with |
---|
| 238 | # details about what they can offer in terms of CPU power. When |
---|
| 239 | # a worker has received all replies from its peers, it sends back |
---|
| 240 | # its own proffer message to the client that started the |
---|
| 241 | # solicitation. |
---|
| 242 | # ------------------------------------------------------------------ |
---|
| 243 | define proffer {token details} { |
---|
| 244 | log debug "PROFFER from peer: $token $details" |
---|
| 245 | Solicitation::proffer $token $details |
---|
| 246 | return "" |
---|
| 247 | } |
---|
[1251] | 248 | } |
---|
| 249 | |
---|
| 250 | # ---------------------------------------------------------------------- |
---|
[1273] | 251 | # COMMAND: broadcast_to_peers <message> ?<avoidList>? |
---|
[1257] | 252 | # |
---|
[1273] | 253 | # Used to broadcast a message out to all peers. The <message> must |
---|
| 254 | # be one of the commands defined in the worker protocol. If there |
---|
| 255 | # are no peer connections yet, this command does nothing. If any |
---|
| 256 | # peer appears on the <avoidList>, it is skipped. Returns the |
---|
| 257 | # number of messages sent out, so this peer knows how many replies |
---|
| 258 | # to wait for. |
---|
[1257] | 259 | # ---------------------------------------------------------------------- |
---|
[1273] | 260 | proc broadcast_to_peers {message {avoidList ""}} { |
---|
| 261 | global server peers |
---|
[1257] | 262 | |
---|
[1273] | 263 | # |
---|
| 264 | # Build a list of all peer addresses, so we know who we're |
---|
| 265 | # going to send this message out to. |
---|
| 266 | # |
---|
| 267 | set recipients "" |
---|
| 268 | foreach key [array names peers current-*] { |
---|
| 269 | set addr [$peers($key) address] |
---|
| 270 | if {[lsearch $avoidList $addr] < 0} { |
---|
| 271 | lappend recipients $addr |
---|
[1257] | 272 | } |
---|
| 273 | } |
---|
[1273] | 274 | foreach cid [$server connections hubzero:workers<-workerc/1] { |
---|
| 275 | set addr [lindex [$server connectionName $cid] 0] ;# x.x.x.x (sockN) |
---|
| 276 | if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} { |
---|
| 277 | lappend recipients $addr |
---|
| 278 | } |
---|
| 279 | } |
---|
[1257] | 280 | |
---|
[1273] | 281 | # |
---|
| 282 | # If the message has any @RECIPIENTS fields, replace them |
---|
| 283 | # with the list of recipients |
---|
| 284 | # |
---|
| 285 | regsub -all @RECIPIENTS $message $recipients message |
---|
[1257] | 286 | |
---|
[1273] | 287 | # |
---|
| 288 | # Send the message out to all peers. Keep a count and double-check |
---|
| 289 | # it against the list of recipients generated above. |
---|
| 290 | # |
---|
| 291 | set nmesgs 0 |
---|
| 292 | |
---|
| 293 | # send off to other workers that this one has connected to |
---|
| 294 | foreach key [array names peers current-*] { |
---|
| 295 | set addr [$peers($key) address] |
---|
| 296 | if {[lsearch $avoidList $addr] < 0} { |
---|
| 297 | if {[catch {$peers($key) send $message} err] == 0} { |
---|
| 298 | incr nmesgs |
---|
| 299 | } else { |
---|
| 300 | log error "ERROR: broadcast failed to [$peers($key) address]: $result\n (message was \"$message\")" |
---|
| 301 | } |
---|
| 302 | } |
---|
[1257] | 303 | } |
---|
| 304 | |
---|
[1273] | 305 | # send off to other workers that connected to this one |
---|
| 306 | foreach cid [$server connections hubzero:workers<-workerc/1] { |
---|
| 307 | set addr [lindex [$server connectionName $cid] 0] ;# x.x.x.x (sockN) |
---|
| 308 | if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} { |
---|
| 309 | if {[catch {puts $cid $message} result] == 0} { |
---|
| 310 | incr nmesgs |
---|
| 311 | } else { |
---|
| 312 | log error "ERROR: broadcast failed for $cid: $result" |
---|
| 313 | log error " (message was $message)" |
---|
| 314 | } |
---|
| 315 | } |
---|
| 316 | } |
---|
| 317 | |
---|
| 318 | # did we send the right number of messages? |
---|
| 319 | if {[llength $recipients] != $nmesgs} { |
---|
| 320 | log error "ERROR: sent only $nmesgs messages to peers {$recipients}" |
---|
| 321 | } |
---|
| 322 | |
---|
| 323 | return $nmesgs |
---|
[1257] | 324 | } |
---|
| 325 | |
---|
| 326 | # ---------------------------------------------------------------------- |
---|
[1273] | 327 | # COMMAND: worker_got_protocol |
---|
[1257] | 328 | # |
---|
[1273] | 329 | # Invoked whenever a peer sends their protocol message to this |
---|
| 330 | # worker. Sends the same protocol name back, so the other worker |
---|
| 331 | # understands what protocol we're speaking. |
---|
[1257] | 332 | # ---------------------------------------------------------------------- |
---|
[1273] | 333 | proc worker_got_protocol {cid protocol} { |
---|
| 334 | switch -glob -- $protocol { |
---|
| 335 | *<-worker* { |
---|
| 336 | puts $cid [list protocol hubzero:workerc<-workers/1] |
---|
| 337 | } |
---|
| 338 | *<-foreman* { |
---|
| 339 | puts $cid [list protocol hubzero:foreman<-worker/1] |
---|
| 340 | } |
---|
| 341 | DEF* { |
---|
| 342 | # do nothing |
---|
| 343 | } |
---|
| 344 | default { |
---|
| 345 | error "don't recognize protocol \"$protocol\"" |
---|
| 346 | } |
---|
| 347 | } |
---|
| 348 | } |
---|
[1257] | 349 | |
---|
[1273] | 350 | # ---------------------------------------------------------------------- |
---|
| 351 | # COMMAND: worker_got_dropped <address> |
---|
| 352 | # |
---|
| 353 | # Invoked whenever an inbound connection to this worker drops. |
---|
| 354 | # At some point we should try updating our connections to other |
---|
| 355 | # peers, to replace this missing connection. |
---|
| 356 | # ---------------------------------------------------------------------- |
---|
| 357 | proc worker_got_dropped {addr} { |
---|
| 358 | global peers |
---|
| 359 | if {[info exists peers(current-$addr)]} { |
---|
| 360 | unset peers(current-$addr) |
---|
| 361 | after cancel {peer-network goto measure} |
---|
| 362 | after 5000 {peer-network goto measure} |
---|
| 363 | log debug "peer dropped: $addr" |
---|
| 364 | } |
---|
| 365 | } |
---|
| 366 | |
---|
| 367 | # ====================================================================== |
---|
| 368 | # Connect to one of the authorities to get a list of peers. Then, |
---|
| 369 | # sit in the event loop and process events. |
---|
| 370 | # ====================================================================== |
---|
| 371 | p2p::wonks::init |
---|
| 372 | |
---|
| 373 | set server [p2p::server -port 9101? \ |
---|
| 374 | -protocols {hubzero:workers<-workerc hubzero:worker<-foreman} \ |
---|
| 375 | -servername worker \ |
---|
| 376 | -onprotocol worker_got_protocol \ |
---|
| 377 | -ondisconnect worker_got_dropped] |
---|
| 378 | |
---|
| 379 | # ---------------------------------------------------------------------- |
---|
| 380 | # AUTHORITY CONNECTION |
---|
| 381 | # |
---|
| 382 | # This is the state machine representing the connection to the |
---|
| 383 | # authority server. We start in the "idle" state. Whenever we |
---|
| 384 | # try to move to "connected", we'll open a connection to an authority |
---|
| 385 | # and request updated information on workers. If that connection |
---|
| 386 | # fails, we'll get kicked back to "idle" and we'll try again later. |
---|
| 387 | # ---------------------------------------------------------------------- |
---|
| 388 | StateMachine authority-connection |
---|
| 389 | authority-connection statedata cnx "" |
---|
| 390 | |
---|
| 391 | # sit here whenever we don't have a connection |
---|
| 392 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
| 393 | authority-connection state idle -onenter { |
---|
| 394 | # have an open connection? then close it |
---|
| 395 | if {"" != [statedata cnx]} { |
---|
| 396 | catch {itcl::delete object [statedata cnx]} |
---|
| 397 | statedata cnx "" |
---|
| 398 | } |
---|
| 399 | # try to connect again later |
---|
| 400 | set delay [p2p::options get time_between_authority_checks] |
---|
| 401 | after $delay {authority-connection goto connected} |
---|
| 402 | } -onleave { |
---|
| 403 | after cancel {authority-connection goto connected} |
---|
| 404 | } |
---|
| 405 | |
---|
| 406 | # sit here after we're connected and waiting for a response |
---|
| 407 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
| 408 | authority-connection state connected |
---|
| 409 | |
---|
| 410 | # when moving to the connected state, make a connection |
---|
| 411 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
| 412 | authority-connection transition idle->connected -onchange { |
---|
| 413 | global server |
---|
| 414 | |
---|
| 415 | # connect to the authority and request a list of peers |
---|
| 416 | statedata cnx "" |
---|
| 417 | foreach addr [randomize [p2p::options get authority_hosts]] { |
---|
| 418 | if {[catch {p2p::client -address $addr \ |
---|
| 419 | -sendprotocol hubzero:authority<-worker/1 \ |
---|
| 420 | -receiveprotocol hubzero:worker<-authority/1} result] == 0} { |
---|
| 421 | statedata cnx $result |
---|
| 422 | break |
---|
[1257] | 423 | } |
---|
| 424 | } |
---|
[1273] | 425 | |
---|
| 426 | if {"" != [statedata cnx]} { |
---|
| 427 | [statedata cnx] send "listening [$server port]" |
---|
| 428 | [statedata cnx] send "peers" |
---|
| 429 | } else { |
---|
| 430 | error "can't connect to any authority\nAUTHORITY LIST: [p2p::options get authority_hosts]" |
---|
| 431 | } |
---|
[1257] | 432 | } |
---|
| 433 | |
---|
| 434 | # ---------------------------------------------------------------------- |
---|
[1273] | 435 | # PEER-TO-PEER CONNECTIONS |
---|
[1251] | 436 | # |
---|
[1273] | 437 | # This is the state machine representing the network of worker peers. |
---|
| 438 | # We start in the "idle" state. From time to time we move to the |
---|
| 439 | # "measure" state and attempt to establish connections with a set |
---|
| 440 | # peers. We then wait for ping/pong responses and move to "finalize". |
---|
| 441 | # When we have all responses, we move to "finalize" and finalize all |
---|
| 442 | # connections, and then move back to "idle". |
---|
[1251] | 443 | # ---------------------------------------------------------------------- |
---|
[1273] | 444 | StateMachine peer-network |
---|
| 445 | peer-network statedata cnx "" |
---|
[1251] | 446 | |
---|
[1273] | 447 | # sit here when the peer network is okay |
---|
| 448 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
| 449 | peer-network state idle -onenter { |
---|
| 450 | # try to connect again later |
---|
| 451 | set delay [p2p::options get time_between_network_rebuilds] |
---|
| 452 | after $delay {peer-network goto measure} |
---|
| 453 | } -onleave { |
---|
| 454 | after cancel {peer-network goto measure} |
---|
| 455 | } |
---|
| 456 | |
---|
| 457 | # sit here when we need to rebuild the peer-to-peer network |
---|
| 458 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
| 459 | peer-network state measure |
---|
| 460 | |
---|
| 461 | # when moving to the start state, make connections to a bunch of peers |
---|
| 462 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
| 463 | peer-network transition idle->measure -onchange { |
---|
| 464 | global server myaddress peers |
---|
| 465 | |
---|
[1251] | 466 | # |
---|
[1273] | 467 | # Get a list of workers already connected to this one |
---|
| 468 | # with an inbound connection to the server. |
---|
| 469 | # |
---|
| 470 | foreach cid [$server connections hubzero:workers<-workerc/1] { |
---|
| 471 | set addr [lindex [$server connectionName $cid] 0] |
---|
| 472 | set inbound($addr) $cid |
---|
| 473 | } |
---|
| 474 | |
---|
| 475 | # |
---|
[1251] | 476 | # Pick a random group of peers and try to connect to them. |
---|
[1273] | 477 | # Start with the existing peers to see if their connection |
---|
| 478 | # is still favorable, and then add a random bunch of others. |
---|
[1251] | 479 | # |
---|
[1273] | 480 | set peers(testing) "" |
---|
| 481 | set peers(responses) 0 |
---|
[1251] | 482 | foreach key [array names peers current-*] { |
---|
| 483 | set peer $peers($key) |
---|
| 484 | lappend peers(testing) $peer |
---|
| 485 | } |
---|
| 486 | |
---|
| 487 | # |
---|
| 488 | # Pick other peers at random. We don't have to try all |
---|
| 489 | # peers--just some number that is much larger than the |
---|
| 490 | # final number we want to talk to. |
---|
| 491 | # |
---|
[1273] | 492 | set maxpeers [p2p::options get max_peer_connections] |
---|
| 493 | foreach addr [randomize $peers(all) [expr {10*$maxpeers}]] { |
---|
| 494 | # |
---|
| 495 | # Avoid connecting to ourself, or to peers that we're |
---|
| 496 | # already connected to (either as inbound connections |
---|
| 497 | # to the server or as outbound connections to others). |
---|
| 498 | # |
---|
| 499 | if {$addr == $myaddress |
---|
| 500 | || [info exists peers(current-$addr)] |
---|
| 501 | || [info exists inbound($addr)]} { |
---|
[1251] | 502 | continue |
---|
| 503 | } |
---|
| 504 | |
---|
[1273] | 505 | if {[catch {p2p::client -address $addr \ |
---|
| 506 | -sendprotocol hubzero:workers<-workerc/1 \ |
---|
| 507 | -receiveprotocol hubzero:workerc<-workers/1} cnx]} { |
---|
| 508 | continue |
---|
| 509 | } |
---|
| 510 | $cnx send "identity $myaddress" |
---|
| 511 | lappend peers(testing) $cnx |
---|
[1251] | 512 | |
---|
[1273] | 513 | # have enough connections to test? |
---|
| 514 | if {[llength $peers(testing)] >= 2*$maxpeers} { |
---|
| 515 | break |
---|
[1251] | 516 | } |
---|
| 517 | } |
---|
| 518 | |
---|
| 519 | # |
---|
| 520 | # Now, loop through all peers and send a "ping" message. |
---|
| 521 | # |
---|
[1273] | 522 | foreach cnx $peers(testing) { |
---|
[1251] | 523 | # mark the time and send a ping to this peer |
---|
[1273] | 524 | set peers(ping-$cnx-start) [clock clicks -milliseconds] |
---|
| 525 | $cnx send "ping" |
---|
[1251] | 526 | } |
---|
| 527 | |
---|
| 528 | # if this test takes too long, just give up |
---|
[1273] | 529 | after 10000 {peer-network goto finalize} |
---|
[1251] | 530 | } |
---|
| 531 | |
---|
[1273] | 532 | # sit here when we need to rebuild the peer-to-peer network |
---|
| 533 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
| 534 | peer-network state finalize -onenter { |
---|
| 535 | after cancel {peer-network goto finalize} |
---|
[1251] | 536 | |
---|
[1273] | 537 | # everything is finalized now, so go to idle |
---|
| 538 | after idle {peer-network goto idle} |
---|
| 539 | } |
---|
[1251] | 540 | |
---|
[1273] | 541 | # when moving to the finalize state, decide on the network of peers |
---|
| 542 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
| 543 | peer-network transition measure->finalize -onchange { |
---|
| 544 | global peers |
---|
[1251] | 545 | |
---|
[1273] | 546 | set maxpeers [p2p::options get max_peer_connections] |
---|
[1251] | 547 | |
---|
[1273] | 548 | # build a list: {peer latency} {peer latency} ... |
---|
| 549 | set plist "" |
---|
| 550 | foreach obj $peers(testing) { |
---|
| 551 | if {[info exists peers(ping-$obj-latency)]} { |
---|
| 552 | lappend plist [list $obj $peers(ping-$obj-latency)] |
---|
[1251] | 553 | } |
---|
[1273] | 554 | } |
---|
[1251] | 555 | |
---|
[1273] | 556 | # sort the list and extract the top peers |
---|
| 557 | set plist [lsort -increasing -index 1 $plist] |
---|
| 558 | set plist [lrange $plist 0 [expr {$maxpeers-1}]] |
---|
| 559 | |
---|
| 560 | set currentpeers [array names peers current-*] |
---|
| 561 | |
---|
| 562 | # transfer the top peers to the "current" list |
---|
| 563 | set all "" |
---|
| 564 | foreach rec $plist { |
---|
| 565 | set peer [lindex $rec 0] |
---|
| 566 | set addr [$peer address] |
---|
| 567 | set peers(current-$addr) $peer |
---|
| 568 | lappend all $addr |
---|
| 569 | |
---|
| 570 | # if it is already on the "current" list, then keep it |
---|
| 571 | set i [lsearch $currentpeers current-$addr] |
---|
| 572 | if {$i >= 0} { |
---|
| 573 | set currentpeers [lreplace $currentpeers $i $i] |
---|
[1251] | 574 | } |
---|
[1273] | 575 | set i [lsearch $peers(testing) $peer] |
---|
| 576 | if {$i >= 0} { |
---|
| 577 | set peers(testing) [lreplace $peers(testing) $i $i] |
---|
| 578 | } |
---|
| 579 | } |
---|
| 580 | log system "connected to peers: $all" |
---|
[1251] | 581 | |
---|
[1273] | 582 | # get rid of old peers that we no longer want to talk to |
---|
| 583 | foreach leftover $currentpeers { |
---|
| 584 | itcl::delete object $peers($leftover) |
---|
| 585 | unset peers($leftover) |
---|
[1251] | 586 | } |
---|
| 587 | |
---|
[1273] | 588 | # clean up after this test |
---|
[1251] | 589 | foreach obj $peers(testing) { |
---|
| 590 | catch {itcl::delete object $obj} |
---|
| 591 | } |
---|
| 592 | set peers(testing) "" |
---|
| 593 | |
---|
| 594 | foreach key [array names peers ping-*] { |
---|
| 595 | unset peers($key) |
---|
| 596 | } |
---|
| 597 | set peers(responses) 0 |
---|
| 598 | } |
---|
| 599 | |
---|
| 600 | # ---------------------------------------------------------------------- |
---|
[1273] | 601 | # JOB SOLICITATION |
---|
[1251] | 602 | # |
---|
[1273] | 603 | # Each worker can receive a "solicit" request, asking for information |
---|
| 604 | # about performance and price of peers available to work. Each worker |
---|
| 605 | # sends the request on to its peers, then gathers the information |
---|
| 606 | # and sends it back to the client or to the peer requesting the |
---|
| 607 | # information. There can be multiple requests going on at once, |
---|
| 608 | # and each may have different job types and return different info, |
---|
| 609 | # so the class below helps to watch over each request until it has |
---|
| 610 | # completed. |
---|
[1251] | 611 | # ---------------------------------------------------------------------- |
---|
[1273] | 612 | itcl::class Solicitation { |
---|
| 613 | public variable connection "" |
---|
| 614 | public variable job "" |
---|
| 615 | public variable path "" |
---|
| 616 | public variable avoid "" |
---|
| 617 | public variable token "" |
---|
| 618 | |
---|
| 619 | private variable _serial "" |
---|
| 620 | private variable _response "" |
---|
| 621 | private variable _waitfor 0 |
---|
| 622 | private variable _timeout "" |
---|
| 623 | |
---|
| 624 | constructor {args} { |
---|
| 625 | eval configure $args |
---|
| 626 | |
---|
| 627 | global myaddress |
---|
| 628 | lappend path $myaddress |
---|
| 629 | lappend avoid $myaddress |
---|
| 630 | set _serial [incr counter] |
---|
| 631 | set all($_serial) $this |
---|
| 632 | |
---|
| 633 | set delay "idle" ;# finalize after waiting for responses |
---|
| 634 | set ttl [p2p::options get peer_time_to_live] |
---|
| 635 | if {[llength $path] < $ttl} { |
---|
| 636 | set mesg [list solicit -job $job -path $path -avoid "$avoid @RECIPIENTS" -token $_serial] |
---|
| 637 | set _waitfor [broadcast_to_peers $mesg $avoid] |
---|
| 638 | log debug "WAIT FOR: $_waitfor" |
---|
| 639 | |
---|
| 640 | if {$_waitfor > 0} { |
---|
| 641 | # add a delay proportional to ttl + time for wonks measurement |
---|
| 642 | set delay [expr {($ttl-[llength $path]-1)*1000 + 3000}] |
---|
| 643 | } |
---|
| 644 | } |
---|
| 645 | log debug "TIMEOUT: $delay" |
---|
| 646 | set _timeout [after $delay [itcl::code $this finalize]] |
---|
[1251] | 647 | } |
---|
[1273] | 648 | destructor { |
---|
| 649 | after cancel $_timeout |
---|
| 650 | catch {unset all($_serial)} |
---|
| 651 | } |
---|
| 652 | |
---|
| 653 | # this adds the info from each proffer to this solicitation |
---|
| 654 | method response {details} { |
---|
| 655 | set addr [lindex $details 0] |
---|
| 656 | append _response $details "\n" |
---|
| 657 | if {[incr _waitfor -1] <= 0} { |
---|
| 658 | log debug "ALL RESPONSES" |
---|
| 659 | finalize |
---|
| 660 | } |
---|
| 661 | } |
---|
| 662 | |
---|
| 663 | # called to finalize when all peers have responded back |
---|
| 664 | method finalize {} { |
---|
| 665 | global myaddress |
---|
| 666 | |
---|
| 667 | # filter out duplicate info from clients |
---|
| 668 | set block "" |
---|
| 669 | foreach line [split $_response \n] { |
---|
| 670 | set addr [lindex $line 0] |
---|
| 671 | if {"" != $addr && ![info exists response($addr)]} { |
---|
| 672 | append block $line "\n" |
---|
| 673 | set response($addr) 1 |
---|
| 674 | } |
---|
| 675 | } |
---|
| 676 | # add details about this client to the message |
---|
| 677 | append block "$myaddress -job $job -cost 1 -wonks [p2p::wonks::current]" |
---|
| 678 | log debug "FINALIZE {$block}" |
---|
| 679 | |
---|
| 680 | # send the composite results back to the caller |
---|
| 681 | set cmd [list proffer $token $block] |
---|
| 682 | if {[catch {puts $connection $cmd} err]} { |
---|
| 683 | log error "ERROR while sending back proffer: $err" |
---|
| 684 | } |
---|
| 685 | itcl::delete object $this |
---|
| 686 | } |
---|
| 687 | |
---|
| 688 | proc proffer {serial details} { |
---|
| 689 | if {[info exists all($serial)]} { |
---|
| 690 | $all($serial) response $details |
---|
| 691 | } |
---|
| 692 | } |
---|
| 693 | |
---|
| 694 | common counter 0 ;# generates serial nums for objects |
---|
| 695 | common all ;# maps serial num => solicitation object |
---|
[1251] | 696 | } |
---|
| 697 | |
---|
[1273] | 698 | # ---------------------------------------------------------------------- |
---|
[1251] | 699 | log system "starting..." |
---|
[1273] | 700 | |
---|
| 701 | after idle { |
---|
| 702 | authority-connection goto connected |
---|
| 703 | } |
---|
| 704 | |
---|
[1251] | 705 | vwait main-loop |
---|