1 | # ---------------------------------------------------------------------- |
---|
2 | # P2P: worker node in P2P mesh bidding on and executing jobs |
---|
3 | # |
---|
4 | # This server is a typical worker node in the P2P mesh for HUBzero |
---|
5 | # job execution. It talks to other workers to distribute the load |
---|
6 | # of job requests. It bids on jobs based on its current resources |
---|
7 | # and executes jobs to earn points from the central authority. |
---|
8 | # ---------------------------------------------------------------------- |
---|
9 | # Michael McLennan (mmclennan@purdue.edu) |
---|
10 | # ====================================================================== |
---|
11 | # Copyright (c) 2004-2012 HUBzero Foundation, LLC |
---|
12 | # |
---|
13 | # See the file "license.terms" for information on usage and |
---|
14 | # redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES. |
---|
15 | # ====================================================================== |
---|
16 | package require Rappture |
---|
17 | |
---|
18 | # recognize other library files in this same directory |
---|
19 | set dir [file dirname [info script]] |
---|
20 | lappend auto_path $dir |
---|
21 | |
---|
22 | # handle log file for this worker |
---|
23 | log channel error on |
---|
24 | log channel system on |
---|
25 | log channel debug on |
---|
26 | |
---|
27 | proc ::bgerror {err} { log error "ERROR: $err $::errorInfo" } |
---|
28 | |
---|
29 | |
---|
30 | set myaddress "?:?" ;# address/port for this worker |
---|
31 | |
---|
32 | # set of connections for authority servers |
---|
33 | p2p::options register authority_hosts 127.0.0.1:9001 |
---|
34 | |
---|
35 | # register with the central authority at this frequency |
---|
36 | p2p::options register time_between_authority_checks 60000 |
---|
37 | |
---|
38 | # rebuild the peer-to-peer network at this frequency |
---|
39 | p2p::options register time_between_network_rebuilds 600000 |
---|
40 | |
---|
41 | # this worker should try to connect with this many other peers |
---|
42 | p2p::options register max_peer_connections 4 |
---|
43 | |
---|
44 | # workers propagate messages until time-to-live reaches 0 |
---|
45 | p2p::options register peer_time_to_live 4 |
---|
46 | |
---|
47 | # ====================================================================== |
---|
48 | # PROTOCOL: hubzero:worker<-authority/1 |
---|
49 | # |
---|
50 | # The worker initiates communication with the authority, and the |
---|
51 | # authority responds by sending these messages. |
---|
52 | # ====================================================================== |
---|
53 | p2p::protocol::register hubzero:worker<-authority/1 { |
---|
54 | |
---|
55 | # ------------------------------------------------------------------ |
---|
56 | # INCOMING: options <key1> <value1> <key2> <value2> ... |
---|
57 | # These option settings coming from the authority override the |
---|
58 | # option settings built into the client. The authority probably |
---|
59 | # has a more up-to-date list of other authorities and better |
---|
60 | # policies for living within the network. |
---|
61 | # ------------------------------------------------------------------ |
---|
62 | define options {args} { |
---|
63 | global server myaddress |
---|
64 | |
---|
65 | array set options $args |
---|
66 | foreach key [array names options] { |
---|
67 | catch {p2p::options set $key $options($key)} |
---|
68 | } |
---|
69 | |
---|
70 | if {[info exists options(ip)]} { |
---|
71 | set myaddress $options(ip):[$server port] |
---|
72 | log debug "my address $myaddress" |
---|
73 | } |
---|
74 | return "" |
---|
75 | } |
---|
76 | |
---|
77 | # ------------------------------------------------------------------ |
---|
78 | # INCOMING: peers <listOfAddresses> |
---|
79 | # This message comes in after this worker has sent the "peers" |
---|
80 | # message to request the current list of peers. The |
---|
81 | # <listOfAddresses> is a list of host:port addresses that this |
---|
82 | # worker should contact to enter the p2p network. |
---|
83 | # ------------------------------------------------------------------ |
---|
84 | define peers {plist} { |
---|
85 | global peers |
---|
86 | set peers(all) $plist |
---|
87 | after idle {peer-network goto measure} |
---|
88 | |
---|
89 | # now that we've gotten the peers, we're done with the authority |
---|
90 | authority-connection goto idle |
---|
91 | return "" |
---|
92 | } |
---|
93 | |
---|
94 | # ------------------------------------------------------------------ |
---|
95 | # INCOMING: identity |
---|
96 | # Used for debugging, so that each client can identify itself by |
---|
97 | # name to this worker. |
---|
98 | # ------------------------------------------------------------------ |
---|
99 | define identity {name} { |
---|
100 | variable cid |
---|
101 | variable handler |
---|
102 | $handler connectionName $cid $name |
---|
103 | return "" |
---|
104 | } |
---|
105 | } |
---|
106 | |
---|
107 | # ====================================================================== |
---|
108 | # PROTOCOL: hubzero:worker<-foreman/1 |
---|
109 | # |
---|
110 | # The foreman initiates communication with a worker, and sends the |
---|
111 | # various messages supported below. |
---|
112 | # ====================================================================== |
---|
113 | p2p::protocol::register hubzero:worker<-foreman/1 { |
---|
114 | |
---|
115 | # ------------------------------------------------------------------ |
---|
116 | # INCOMING: solicit |
---|
117 | # Foremen send this message to solicit bids for a simulation job. |
---|
118 | # ------------------------------------------------------------------ |
---|
119 | define solicit {args} { |
---|
120 | variable cid |
---|
121 | log debug "solicitation request from foreman: $args" |
---|
122 | eval Solicitation ::#auto $args -connection $cid |
---|
123 | return "" |
---|
124 | } |
---|
125 | } |
---|
126 | |
---|
127 | # ====================================================================== |
---|
128 | # PROTOCOL: hubzero:workers<-workerc/1 |
---|
129 | # |
---|
130 | # Workers initiate connections with other workers as peers. The |
---|
131 | # following messages are sent by worker clients to the worker server. |
---|
132 | # ====================================================================== |
---|
133 | p2p::protocol::register hubzero:workers<-workerc/1 { |
---|
134 | # ------------------------------------------------------------------ |
---|
135 | # INCOMING: identity |
---|
136 | # Used for debugging, so that each client can identify itself by |
---|
137 | # name to this worker. |
---|
138 | # ------------------------------------------------------------------ |
---|
139 | define identity {name} { |
---|
140 | variable cid |
---|
141 | variable handler |
---|
142 | $handler connectionName $cid $name |
---|
143 | return "" |
---|
144 | } |
---|
145 | |
---|
146 | # ------------------------------------------------------------------ |
---|
147 | # INCOMING: ping |
---|
148 | # If another worker sends "ping", they are trying to measure the |
---|
149 | # speed of the connection. Send back a "pong" message. If this |
---|
150 | # worker is close by, the other worker will stay connected to it. |
---|
151 | # ------------------------------------------------------------------ |
---|
152 | define ping {} { |
---|
153 | return "pong" |
---|
154 | } |
---|
155 | |
---|
156 | # ------------------------------------------------------------------ |
---|
157 | # INCOMING: solicit -job info -path hosts -token xxx |
---|
158 | # Workers send this message on to their peers to solicit bids |
---|
159 | # for a simulation job. |
---|
160 | # ------------------------------------------------------------------ |
---|
161 | define solicit {args} { |
---|
162 | variable cid |
---|
163 | log debug "solicitation request from peer: $args" |
---|
164 | eval Solicitation ::#auto $args -connection $cid |
---|
165 | return "" |
---|
166 | } |
---|
167 | |
---|
168 | # ------------------------------------------------------------------ |
---|
169 | # INCOMING: proffer <token> <details> |
---|
170 | # Workers send this message back after a "solicit" request with |
---|
171 | # details about what they can offer in terms of CPU power. When |
---|
172 | # a worker has received all replies from its peers, it sends back |
---|
173 | # its own proffer message to the client that started the |
---|
174 | # solicitation. |
---|
175 | # ------------------------------------------------------------------ |
---|
176 | define proffer {token details} { |
---|
177 | Solicitation::proffer $token $details |
---|
178 | return "" |
---|
179 | } |
---|
180 | } |
---|
181 | |
---|
182 | # ====================================================================== |
---|
183 | # PROTOCOL: hubzero:workerc<-workers/1 |
---|
184 | # |
---|
185 | # The following messages are received by a worker client in response |
---|
186 | # to the requests that they send to a worker server. |
---|
187 | # ====================================================================== |
---|
188 | p2p::protocol::register hubzero:workerc<-workers/1 { |
---|
189 | # ------------------------------------------------------------------ |
---|
190 | # INCOMING: identity |
---|
191 | # Used for debugging, so that each client can identify itself by |
---|
192 | # name to this worker. |
---|
193 | # ------------------------------------------------------------------ |
---|
194 | define identity {name} { |
---|
195 | variable cid |
---|
196 | variable handler |
---|
197 | $handler connectionName $cid $name |
---|
198 | return "" |
---|
199 | } |
---|
200 | |
---|
201 | # ------------------------------------------------------------------ |
---|
202 | # INCOMING: pong |
---|
203 | # When forming the peer-to-peer network, workers send ping/pong |
---|
204 | # messages to one another to measure the latency. |
---|
205 | # ------------------------------------------------------------------ |
---|
206 | define pong {} { |
---|
207 | global peers |
---|
208 | variable handler |
---|
209 | set now [clock clicks -milliseconds] |
---|
210 | set delay [expr {$now - $peers(ping-$handler-start)}] |
---|
211 | set peers(ping-$handler-latency) $delay |
---|
212 | if {[incr peers(responses)] >= [llength $peers(testing)]} { |
---|
213 | after cancel {peer-network goto finalize} |
---|
214 | after idle {peer-network goto finalize} |
---|
215 | } |
---|
216 | return "" |
---|
217 | } |
---|
218 | |
---|
219 | # ------------------------------------------------------------------ |
---|
220 | # INCOMING: solicit -job info -path hosts -token xxx |
---|
221 | # Workers send this message on to their peers to solicit bids |
---|
222 | # for a simulation job. |
---|
223 | # ------------------------------------------------------------------ |
---|
224 | define solicit {args} { |
---|
225 | variable cid |
---|
226 | log debug "solicitation request from peer: $args" |
---|
227 | eval Solicitation ::#auto $args -connection $cid |
---|
228 | return "" |
---|
229 | } |
---|
230 | |
---|
231 | # ------------------------------------------------------------------ |
---|
232 | # INCOMING: proffer <token> <details> |
---|
233 | # Workers send this message back after a "solicit" request with |
---|
234 | # details about what they can offer in terms of CPU power. When |
---|
235 | # a worker has received all replies from its peers, it sends back |
---|
236 | # its own proffer message to the client that started the |
---|
237 | # solicitation. |
---|
238 | # ------------------------------------------------------------------ |
---|
239 | define proffer {token details} { |
---|
240 | Solicitation::proffer $token $details |
---|
241 | return "" |
---|
242 | } |
---|
243 | } |
---|
244 | |
---|
245 | # ---------------------------------------------------------------------- |
---|
246 | # COMMAND: broadcast_to_peers <message> ?<avoidList>? |
---|
247 | # |
---|
248 | # Used to broadcast a message out to all peers. The <message> must |
---|
249 | # be one of the commands defined in the worker protocol. If there |
---|
250 | # are no peer connections yet, this command does nothing. If any |
---|
251 | # peer appears on the <avoidList>, it is skipped. Returns the |
---|
252 | # number of messages sent out, so this peer knows how many replies |
---|
253 | # to wait for. |
---|
254 | # ---------------------------------------------------------------------- |
---|
255 | proc broadcast_to_peers {message {avoidList ""}} { |
---|
256 | global server peers |
---|
257 | |
---|
258 | # |
---|
259 | # Build a list of all peer addresses, so we know who we're |
---|
260 | # going to send this message out to. |
---|
261 | # |
---|
262 | set recipients "" |
---|
263 | foreach key [array names peers current-*] { |
---|
264 | set addr [$peers($key) address] |
---|
265 | if {[lsearch $avoidList $addr] < 0} { |
---|
266 | lappend recipients $addr |
---|
267 | } |
---|
268 | } |
---|
269 | foreach cid [$server connections hubzero:workers<-workerc/1] { |
---|
270 | set addr [lindex [$server connectionName $cid] 0] ;# x.x.x.x (sockN) |
---|
271 | if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} { |
---|
272 | lappend recipients $addr |
---|
273 | } |
---|
274 | } |
---|
275 | |
---|
276 | # |
---|
277 | # If the message has any @RECIPIENTS fields, replace them |
---|
278 | # with the list of recipients |
---|
279 | # |
---|
280 | regsub -all @RECIPIENTS $message $recipients message |
---|
281 | |
---|
282 | # |
---|
283 | # Send the message out to all peers. Keep a count and double-check |
---|
284 | # it against the list of recipients generated above. |
---|
285 | # |
---|
286 | set nmesgs 0 |
---|
287 | |
---|
288 | # send off to other workers that this one has connected to |
---|
289 | foreach key [array names peers current-*] { |
---|
290 | set addr [$peers($key) address] |
---|
291 | if {[lsearch $avoidList $addr] < 0} { |
---|
292 | if {[catch {$peers($key) send $message} err] == 0} { |
---|
293 | incr nmesgs |
---|
294 | } else { |
---|
295 | log error "ERROR: broadcast failed to [$peers($key) address]: $result\n (message was \"$message\")" |
---|
296 | } |
---|
297 | } |
---|
298 | } |
---|
299 | |
---|
300 | # send off to other workers that connected to this one |
---|
301 | foreach cid [$server connections hubzero:workers<-workerc/1] { |
---|
302 | set addr [lindex [$server connectionName $cid] 0] ;# x.x.x.x (sockN) |
---|
303 | if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} { |
---|
304 | if {[catch {puts $cid $message} result] == 0} { |
---|
305 | incr nmesgs |
---|
306 | } else { |
---|
307 | log error "ERROR: broadcast failed for $cid: $result" |
---|
308 | log error " (message was $message)" |
---|
309 | } |
---|
310 | } |
---|
311 | } |
---|
312 | |
---|
313 | # did we send the right number of messages? |
---|
314 | if {[llength $recipients] != $nmesgs} { |
---|
315 | log error "ERROR: sent only $nmesgs messages to peers {$recipients}" |
---|
316 | } |
---|
317 | |
---|
318 | return $nmesgs |
---|
319 | } |
---|
320 | |
---|
321 | # ---------------------------------------------------------------------- |
---|
322 | # COMMAND: worker_got_protocol |
---|
323 | # |
---|
324 | # Invoked whenever a peer sends their protocol message to this |
---|
325 | # worker. Sends the same protocol name back, so the other worker |
---|
326 | # understands what protocol we're speaking. |
---|
327 | # ---------------------------------------------------------------------- |
---|
328 | proc worker_got_protocol {cid protocol} { |
---|
329 | switch -glob -- $protocol { |
---|
330 | *<-worker* { |
---|
331 | puts $cid [list protocol hubzero:workerc<-workers/1] |
---|
332 | } |
---|
333 | *<-foreman* { |
---|
334 | puts $cid [list protocol hubzero:foreman<-worker/1] |
---|
335 | } |
---|
336 | DEF* { |
---|
337 | # do nothing |
---|
338 | } |
---|
339 | default { |
---|
340 | error "don't recognize protocol \"$protocol\"" |
---|
341 | } |
---|
342 | } |
---|
343 | } |
---|
344 | |
---|
345 | # ---------------------------------------------------------------------- |
---|
346 | # COMMAND: worker_got_dropped <address> |
---|
347 | # |
---|
348 | # Invoked whenever an inbound connection to this worker drops. |
---|
349 | # At some point we should try updating our connections to other |
---|
350 | # peers, to replace this missing connection. |
---|
351 | # ---------------------------------------------------------------------- |
---|
352 | proc worker_got_dropped {addr} { |
---|
353 | global peers |
---|
354 | if {[info exists peers(current-$addr)]} { |
---|
355 | unset peers(current-$addr) |
---|
356 | after cancel {peer-network goto measure} |
---|
357 | after 5000 {peer-network goto measure} |
---|
358 | log debug "peer dropped: $addr" |
---|
359 | } |
---|
360 | } |
---|
361 | |
---|
362 | # ====================================================================== |
---|
363 | # Connect to one of the authorities to get a list of peers. Then, |
---|
364 | # sit in the event loop and process events. |
---|
365 | # ====================================================================== |
---|
366 | p2p::wonks::init |
---|
367 | |
---|
368 | set server [p2p::server -port 9101? \ |
---|
369 | -protocols {hubzero:workers<-workerc hubzero:worker<-foreman} \ |
---|
370 | -servername worker \ |
---|
371 | -onprotocol worker_got_protocol \ |
---|
372 | -ondisconnect worker_got_dropped] |
---|
373 | |
---|
374 | # ---------------------------------------------------------------------- |
---|
375 | # AUTHORITY CONNECTION |
---|
376 | # |
---|
377 | # This is the state machine representing the connection to the |
---|
378 | # authority server. We start in the "idle" state. Whenever we |
---|
379 | # try to move to "connected", we'll open a connection to an authority |
---|
380 | # and request updated information on workers. If that connection |
---|
381 | # fails, we'll get kicked back to "idle" and we'll try again later. |
---|
382 | # ---------------------------------------------------------------------- |
---|
383 | StateMachine authority-connection |
---|
384 | authority-connection statedata cnx "" |
---|
385 | |
---|
386 | # sit here whenever we don't have a connection |
---|
387 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
388 | authority-connection state idle -onenter { |
---|
389 | # have an open connection? then close it |
---|
390 | if {"" != [statedata cnx]} { |
---|
391 | catch {itcl::delete object [statedata cnx]} |
---|
392 | statedata cnx "" |
---|
393 | } |
---|
394 | # try to connect again later |
---|
395 | set delay [p2p::options get time_between_authority_checks] |
---|
396 | after $delay {authority-connection goto connected} |
---|
397 | } -onleave { |
---|
398 | after cancel {authority-connection goto connected} |
---|
399 | } |
---|
400 | |
---|
401 | # sit here after we're connected and waiting for a response |
---|
402 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
403 | authority-connection state connected |
---|
404 | |
---|
405 | # when moving to the connected state, make a connection |
---|
406 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
407 | authority-connection transition idle->connected -onchange { |
---|
408 | global server |
---|
409 | |
---|
410 | # connect to the authority and request a list of peers |
---|
411 | statedata cnx "" |
---|
412 | foreach addr [randomize [p2p::options get authority_hosts]] { |
---|
413 | if {[catch {p2p::client -address $addr \ |
---|
414 | -sendprotocol hubzero:authority<-worker/1 \ |
---|
415 | -receiveprotocol hubzero:worker<-authority/1} result] == 0} { |
---|
416 | statedata cnx $result |
---|
417 | break |
---|
418 | } |
---|
419 | } |
---|
420 | |
---|
421 | if {"" != [statedata cnx]} { |
---|
422 | [statedata cnx] send "listening [$server port]" |
---|
423 | [statedata cnx] send "peers" |
---|
424 | } else { |
---|
425 | error "can't connect to any authority\nAUTHORITY LIST: [p2p::options get authority_hosts]" |
---|
426 | } |
---|
427 | } |
---|
428 | |
---|
429 | # ---------------------------------------------------------------------- |
---|
430 | # PEER-TO-PEER CONNECTIONS |
---|
431 | # |
---|
432 | # This is the state machine representing the network of worker peers. |
---|
433 | # We start in the "idle" state. From time to time we move to the |
---|
434 | # "measure" state and attempt to establish connections with a set |
---|
435 | # peers. We then wait for ping/pong responses and move to "finalize". |
---|
436 | # When we have all responses, we move to "finalize" and finalize all |
---|
437 | # connections, and then move back to "idle". |
---|
438 | # ---------------------------------------------------------------------- |
---|
439 | StateMachine peer-network |
---|
440 | peer-network statedata cnx "" |
---|
441 | |
---|
442 | # sit here when the peer network is okay |
---|
443 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
444 | peer-network state idle -onenter { |
---|
445 | # try to connect again later |
---|
446 | set delay [p2p::options get time_between_network_rebuilds] |
---|
447 | after $delay {peer-network goto measure} |
---|
448 | } -onleave { |
---|
449 | after cancel {peer-network goto measure} |
---|
450 | } |
---|
451 | |
---|
452 | # sit here when we need to rebuild the peer-to-peer network |
---|
453 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
454 | peer-network state measure |
---|
455 | |
---|
456 | # when moving to the start state, make connections to a bunch of peers |
---|
457 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
458 | peer-network transition idle->measure -onchange { |
---|
459 | global server myaddress peers |
---|
460 | |
---|
461 | # |
---|
462 | # Get a list of workers already connected to this one |
---|
463 | # with an inbound connection to the server. |
---|
464 | # |
---|
465 | foreach cid [$server connections hubzero:workers<-workerc/1] { |
---|
466 | set addr [lindex [$server connectionName $cid] 0] |
---|
467 | set inbound($addr) $cid |
---|
468 | } |
---|
469 | |
---|
470 | # |
---|
471 | # Pick a random group of peers and try to connect to them. |
---|
472 | # Start with the existing peers to see if their connection |
---|
473 | # is still favorable, and then add a random bunch of others. |
---|
474 | # |
---|
475 | set peers(testing) "" |
---|
476 | set peers(responses) 0 |
---|
477 | foreach key [array names peers current-*] { |
---|
478 | set peer $peers($key) |
---|
479 | lappend peers(testing) $peer |
---|
480 | } |
---|
481 | |
---|
482 | # |
---|
483 | # Pick other peers at random. We don't have to try all |
---|
484 | # peers--just some number that is much larger than the |
---|
485 | # final number we want to talk to. |
---|
486 | # |
---|
487 | set maxpeers [p2p::options get max_peer_connections] |
---|
488 | foreach addr [randomize $peers(all) [expr {10*$maxpeers}]] { |
---|
489 | # |
---|
490 | # Avoid connecting to ourself, or to peers that we're |
---|
491 | # already connected to (either as inbound connections |
---|
492 | # to the server or as outbound connections to others). |
---|
493 | # |
---|
494 | if {$addr == $myaddress |
---|
495 | || [info exists peers(current-$addr)] |
---|
496 | || [info exists inbound($addr)]} { |
---|
497 | continue |
---|
498 | } |
---|
499 | |
---|
500 | if {[catch {p2p::client -address $addr \ |
---|
501 | -sendprotocol hubzero:workers<-workerc/1 \ |
---|
502 | -receiveprotocol hubzero:workerc<-workers/1} cnx]} { |
---|
503 | continue |
---|
504 | } |
---|
505 | $cnx send "identity $myaddress" |
---|
506 | lappend peers(testing) $cnx |
---|
507 | |
---|
508 | # have enough connections to test? |
---|
509 | if {[llength $peers(testing)] >= 2*$maxpeers} { |
---|
510 | break |
---|
511 | } |
---|
512 | } |
---|
513 | |
---|
514 | # |
---|
515 | # Now, loop through all peers and send a "ping" message. |
---|
516 | # |
---|
517 | foreach cnx $peers(testing) { |
---|
518 | # mark the time and send a ping to this peer |
---|
519 | set peers(ping-$cnx-start) [clock clicks -milliseconds] |
---|
520 | $cnx send "ping" |
---|
521 | } |
---|
522 | |
---|
523 | # if this test takes too long, just give up |
---|
524 | after 10000 {peer-network goto finalize} |
---|
525 | } |
---|
526 | |
---|
527 | # sit here when we need to rebuild the peer-to-peer network |
---|
528 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
529 | peer-network state finalize -onenter { |
---|
530 | after cancel {peer-network goto finalize} |
---|
531 | |
---|
532 | # everything is finalized now, so go to idle |
---|
533 | after idle {peer-network goto idle} |
---|
534 | } |
---|
535 | |
---|
536 | # when moving to the finalize state, decide on the network of peers |
---|
537 | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
---|
538 | peer-network transition measure->finalize -onchange { |
---|
539 | global peers |
---|
540 | |
---|
541 | set maxpeers [p2p::options get max_peer_connections] |
---|
542 | |
---|
543 | # build a list: {peer latency} {peer latency} ... |
---|
544 | set plist "" |
---|
545 | foreach obj $peers(testing) { |
---|
546 | if {[info exists peers(ping-$obj-latency)]} { |
---|
547 | lappend plist [list $obj $peers(ping-$obj-latency)] |
---|
548 | } |
---|
549 | } |
---|
550 | |
---|
551 | # sort the list and extract the top peers |
---|
552 | set plist [lsort -increasing -index 1 $plist] |
---|
553 | set plist [lrange $plist 0 [expr {$maxpeers-1}]] |
---|
554 | |
---|
555 | set currentpeers [array names peers current-*] |
---|
556 | |
---|
557 | # transfer the top peers to the "current" list |
---|
558 | set all "" |
---|
559 | foreach rec $plist { |
---|
560 | set peer [lindex $rec 0] |
---|
561 | set addr [$peer address] |
---|
562 | set peers(current-$addr) $peer |
---|
563 | lappend all $addr |
---|
564 | |
---|
565 | # if it is already on the "current" list, then keep it |
---|
566 | set i [lsearch $currentpeers current-$addr] |
---|
567 | if {$i >= 0} { |
---|
568 | set currentpeers [lreplace $currentpeers $i $i] |
---|
569 | } |
---|
570 | set i [lsearch $peers(testing) $peer] |
---|
571 | if {$i >= 0} { |
---|
572 | set peers(testing) [lreplace $peers(testing) $i $i] |
---|
573 | } |
---|
574 | } |
---|
575 | log system "connected to peers: $all" |
---|
576 | |
---|
577 | # get rid of old peers that we no longer want to talk to |
---|
578 | foreach leftover $currentpeers { |
---|
579 | itcl::delete object $peers($leftover) |
---|
580 | unset peers($leftover) |
---|
581 | } |
---|
582 | |
---|
583 | # clean up after this test |
---|
584 | foreach obj $peers(testing) { |
---|
585 | catch {itcl::delete object $obj} |
---|
586 | } |
---|
587 | set peers(testing) "" |
---|
588 | |
---|
589 | foreach key [array names peers ping-*] { |
---|
590 | unset peers($key) |
---|
591 | } |
---|
592 | set peers(responses) 0 |
---|
593 | } |
---|
594 | |
---|
595 | # ---------------------------------------------------------------------- |
---|
596 | # JOB SOLICITATION |
---|
597 | # |
---|
598 | # Each worker can receive a "solicit" request, asking for information |
---|
599 | # about performance and price of peers available to work. Each worker |
---|
600 | # sends the request on to its peers, then gathers the information |
---|
601 | # and sends it back to the client or to the peer requesting the |
---|
602 | # information. There can be multiple requests going on at once, |
---|
603 | # and each may have different job types and return different info, |
---|
604 | # so the class below helps to watch over each request until it has |
---|
605 | # completed. |
---|
606 | # ---------------------------------------------------------------------- |
---|
607 | itcl::class Solicitation { |
---|
608 | public variable connection "" |
---|
609 | public variable job "" |
---|
610 | public variable path "" |
---|
611 | public variable avoid "" |
---|
612 | public variable token "" |
---|
613 | |
---|
614 | private variable _serial "" |
---|
615 | private variable _response "" |
---|
616 | private variable _waitfor 0 |
---|
617 | private variable _timeout "" |
---|
618 | |
---|
619 | constructor {args} { |
---|
620 | eval configure $args |
---|
621 | |
---|
622 | global myaddress |
---|
623 | lappend path $myaddress |
---|
624 | lappend avoid $myaddress |
---|
625 | set _serial [incr counter] |
---|
626 | set all($_serial) $this |
---|
627 | |
---|
628 | set delay "idle" ;# finalize after waiting for responses |
---|
629 | set ttl [p2p::options get peer_time_to_live] |
---|
630 | if {[llength $path] < $ttl} { |
---|
631 | set mesg [list solicit -job $job -path $path -avoid "$avoid @RECIPIENTS" -token $_serial] |
---|
632 | set _waitfor [broadcast_to_peers $mesg $avoid] |
---|
633 | |
---|
634 | if {$_waitfor > 0} { |
---|
635 | # add a delay proportional to ttl + time for wonks measurement |
---|
636 | set delay [expr {($ttl-[llength $path]-1)*1000 + 3000}] |
---|
637 | } |
---|
638 | } |
---|
639 | set _timeout [after $delay [itcl::code $this finalize]] |
---|
640 | } |
---|
641 | destructor { |
---|
642 | after cancel $_timeout |
---|
643 | catch {unset all($_serial)} |
---|
644 | } |
---|
645 | |
---|
646 | # this adds the info from each proffer to this solicitation |
---|
647 | method response {details} { |
---|
648 | set addr [lindex $details 0] |
---|
649 | append _response $details "\n" |
---|
650 | if {[incr _waitfor -1] <= 0} { |
---|
651 | finalize |
---|
652 | } |
---|
653 | } |
---|
654 | |
---|
655 | # called to finalize when all peers have responded back |
---|
656 | method finalize {} { |
---|
657 | global myaddress |
---|
658 | |
---|
659 | # filter out duplicate info from clients |
---|
660 | set block "" |
---|
661 | foreach line [split $_response \n] { |
---|
662 | set addr [lindex $line 0] |
---|
663 | if {"" != $addr && ![info exists response($addr)]} { |
---|
664 | append block $line "\n" |
---|
665 | set response($addr) 1 |
---|
666 | } |
---|
667 | } |
---|
668 | # add details about this client to the message |
---|
669 | append block "$myaddress -job $job -cost 1 -wonks [p2p::wonks::current]" |
---|
670 | |
---|
671 | # send the composite results back to the caller |
---|
672 | set cmd [list proffer $token $block] |
---|
673 | if {[catch {puts $connection $cmd} err]} { |
---|
674 | log error "ERROR while sending back proffer: $err" |
---|
675 | } |
---|
676 | itcl::delete object $this |
---|
677 | } |
---|
678 | |
---|
679 | proc proffer {serial details} { |
---|
680 | if {[info exists all($serial)]} { |
---|
681 | $all($serial) response $details |
---|
682 | } |
---|
683 | } |
---|
684 | |
---|
685 | common counter 0 ;# generates serial nums for objects |
---|
686 | common all ;# maps serial num => solicitation object |
---|
687 | } |
---|
688 | |
---|
689 | # ---------------------------------------------------------------------- |
---|
690 | log system "starting..." |
---|
691 | |
---|
692 | after idle { |
---|
693 | authority-connection goto connected |
---|
694 | } |
---|
695 | |
---|
696 | vwait main-loop |
---|