Changeset 1273 for trunk/p2p/worker.tcl


Ignore:
Timestamp:
Feb 5, 2009 6:17:23 AM (15 years ago)
Author:
mmc
Message:

Major reorganization of p2p code, and support for solicit/proffer
messages.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/p2p/worker.tcl

    r1257 r1273  
    2121
    2222# handle log file for this worker
     23log channel error on
     24log channel system on
    2325log channel debug on
    24 log channel system on
    25 
    26 # set up a server at the first open port above 9101
    27 set server [Server ::#auto 9101? -servername worker \
    28     -onprotocol worker_peers_protocol]
    29 
    30 # ======================================================================
    31 #  OPTIONS
    32 #  These options get the worker started, but new option settings
    33 #  some from the authority after this worker first connects.
    34 # ======================================================================
    35 set authority ""     ;# file handle for authority connection
     26
     27proc ::bgerror {err} { log error "ERROR: $err $::errorInfo" }
     28
     29
    3630set myaddress "?:?"  ;# address/port for this worker
    3731
    3832# set of connections for authority servers
    39 set options(authority_hosts) 127.0.0.1:9001
     33p2p::options register authority_hosts 127.0.0.1:9001
    4034
    4135# register with the central authority at this frequency
    42 set options(time_between_registers) 10000
     36p2p::options register time_between_authority_checks 60000
     37
     38# rebuild the peer-to-peer network at this frequency
     39p2p::options register time_between_network_rebuilds 600000
    4340
    4441# this worker should try to connect with this many other peers
    45 set options(max_peer_connections) 4
    46 
    47 # number of seconds between each check of system load
    48 set options(time_between_load_checks) 60000
    49 
    50 # ======================================================================
    51 #  PEERS
    52 # ======================================================================
    53 # eventually set to the list of all peers in the network
    54 set peers(all) ""
    55 
    56 # list of peers that we're testing connections with
    57 set peers(testing) ""
    58 
    59 # ======================================================================
    60 #  SYSTEM LOAD
    61 # ======================================================================
    62 # Check system load every so often.  When the load changes
    63 # significantly, execute a "perftest" run to compute the current
    64 # number of wonks available to this worker.
    65 
    66 set sysload(lastLoad) -1
    67 set sysload(wonks) -1
    68 set sysload(measured) -1
    69 
    70 after idle worker_load_check
    71 
    72 # ======================================================================
    73 #  PROTOCOL: hubzero:peer/1
    74 #  This protocol gets used when another worker connects to this one.
    75 # ======================================================================
    76 $server protocol hubzero:peer/1
    77 
    78 $server define hubzero:peer/1 exception {message} {
    79     log system "ERROR: $message"
    80 }
    81 
    82 # ----------------------------------------------------------------------
    83 #  DIRECTIVE: identity
    84 #  Used for debugging, so that each client can identify itself by
    85 #  name to this worker.
    86 # ----------------------------------------------------------------------
    87 $server define hubzero:peer/1 identity {name} {
    88     variable cid
    89     variable handler
    90     $handler connectionName $cid $name
    91     return ""
    92 }
    93 
    94 # ----------------------------------------------------------------------
    95 #  DIRECTIVE: ping
    96 # ----------------------------------------------------------------------
    97 $server define hubzero:peer/1 ping {} {
    98     return "pong"
    99 }
    100 
    101 # ======================================================================
    102 #  PROTOCOL: hubzero:worker/1
    103 #  The authority_connect procedure connects this worker to an authority
    104 #  and begins the authority/worker protocol.  The directives below
    105 #  handle the incoming (authority->worker) traffic.
    106 # ======================================================================
    107 proc authority_connect {} {
    108     global authority options
    109     log system "connecting to authorities..."
    110 
    111     authority_disconnect
    112 
    113     # scan through list of authorities and try to connect
    114     foreach addr [randomize $options(authority_hosts)] {
    115         if {[catch {Client ::#auto $addr} result] == 0} {
    116             set authority $result
    117             break
    118         }
    119     }
    120 
    121     if {"" == $authority} {
    122         error "can't connect to any known authorities"
    123     }
    124 
    125     # add handlers to process the incoming commands...
    126     $authority protocol hubzero:worker/1
    127     $authority define hubzero:worker/1 exception {args} {
    128         log system "error from authority: $args"
    129     }
    130 
    131     # ------------------------------------------------------------------
    132     #  DIRECTIVE: options <key1> <value1> <key2> <value2> ...
     42p2p::options register max_peer_connections 4
     43
     44# workers propagate messages until time-to-live reaches 0
     45p2p::options register peer_time_to_live 4
     46
     47# ======================================================================
     48#  PROTOCOL: hubzero:worker<-authority/1
     49#
     50#  The worker initiates communication with the authority, and the
     51#  authority responds by sending these messages.
     52# ======================================================================
     53p2p::protocol::register hubzero:worker<-authority/1 {
     54
     55    # ------------------------------------------------------------------
     56    #  INCOMING: options <key1> <value1> <key2> <value2> ...
    13357    #  These option settings coming from the authority override the
    134     #  option settings built into the client at the top of this
    135     #  script.  The authority probably has a more up-to-date list
    136     #  of other authorities and better policies for living within
    137     #  the network.
    138     # ------------------------------------------------------------------
    139     $authority define hubzero:worker/1 options {args} {
    140         global options server myaddress
     58    #  option settings built into the client.  The authority probably
     59    #  has a more up-to-date list of other authorities and better
     60    #  policies for living within the network.
     61    # ------------------------------------------------------------------
     62    define options {args} {
     63        global server myaddress
    14164
    14265        array set options $args
     66        foreach key [array names options] {
     67            catch {p2p::options set $key $options($key)}
     68        }
    14369
    14470        if {[info exists options(ip)]} {
     
    15076
    15177    # ------------------------------------------------------------------
    152     #  DIRECTIVE: peers <listOfAddresses>
     78    #  INCOMING: peers <listOfAddresses>
    15379    #  This message comes in after this worker has sent the "peers"
    15480    #  message to request the current list of peers.  The
     
    15682    #  worker should contact to enter the p2p network.
    15783    # ------------------------------------------------------------------
    158     $authority define hubzero:worker/1 peers {plist} {
     84    define peers {plist} {
    15985        global peers
    16086        set peers(all) $plist
    161         after idle worker_peers_update
     87        after idle {peer-network goto measure}
    16288
    16389        # now that we've gotten the peers, we're done with the authority
    164         authority_disconnect
    165     }
    166 
    167     # ------------------------------------------------------------------
    168     #  DIRECTIVE: identity
     90        authority-connection goto idle
     91        return ""
     92    }
     93
     94    # ------------------------------------------------------------------
     95    #  INCOMING: identity
    16996    #  Used for debugging, so that each client can identify itself by
    17097    #  name to this worker.
    17198    # ------------------------------------------------------------------
    172     $authority define hubzero:worker/1 identity {name} {
     99    define identity {name} {
    173100        variable cid
    174101        variable handler
     
    176103        return ""
    177104    }
    178 
    179     $authority send "protocol hubzero:worker/1"
    180     return $authority
    181 }
    182 
    183 proc authority_disconnect {} {
    184     global authority
    185     if {"" != $authority} {
    186         catch {itcl::delete object $authority}
    187         set authority ""
    188     }
    189 }
    190 
    191 # ======================================================================
    192 #  USEFUL ROUTINES
    193 # ======================================================================
    194 # ----------------------------------------------------------------------
    195 #  COMMAND:  worker_register
    196 #
    197 #  Invoked when this worker first starts up and periodically thereafter
    198 #  to register the worker with the central authority, and to request
    199 #  a list of peers that this peer can talk to.
    200 # ----------------------------------------------------------------------
    201 proc worker_register {} {
    202     global options server
     105}
     106
     107# ======================================================================
     108#  PROTOCOL: hubzero:worker<-foreman/1
     109#
     110#  The foreman initiates communication with a worker, and sends the
     111#  various messages supported below.
     112# ======================================================================
     113p2p::protocol::register hubzero:worker<-foreman/1 {
     114
     115    # ------------------------------------------------------------------
     116    #  INCOMING: solicit
     117    #  Foremen send this message to solicit bids for a simulation job.
     118    # ------------------------------------------------------------------
     119    define solicit {args} {
     120log debug "SOLICIT from foreman: $args"
     121        variable cid
     122        log debug "solicitation request from foreman: $args"
     123        eval Solicitation ::#auto $args -connection $cid
     124        return ""
     125    }
     126}
     127
     128# ======================================================================
     129#  PROTOCOL: hubzero:workers<-workerc/1
     130#
     131#  Workers initiate connections with other workers as peers.  The
     132#  following messages are sent by worker clients to the worker server.
     133# ======================================================================
     134p2p::protocol::register hubzero:workers<-workerc/1 {
     135    # ------------------------------------------------------------------
     136    #  INCOMING: identity
     137    #  Used for debugging, so that each client can identify itself by
     138    #  name to this worker.
     139    # ------------------------------------------------------------------
     140    define identity {name} {
     141        variable cid
     142        variable handler
     143        $handler connectionName $cid $name
     144        return ""
     145    }
     146
     147    # ------------------------------------------------------------------
     148    #  INCOMING: ping
     149    #  If another worker sends "ping", they are trying to measure the
     150    #  speed of the connection.  Send back a "pong" message.  If this
     151    #  worker is close by, the other worker will stay connected to it.
     152    # ------------------------------------------------------------------
     153    define ping {} {
     154        return "pong"
     155    }
     156
     157    # ------------------------------------------------------------------
     158    #  INCOMING: solicit -job info -path hosts -token xxx
     159    #  Workers send this message on to their peers to solicit bids
     160    #  for a simulation job.
     161    # ------------------------------------------------------------------
     162    define solicit {args} {
     163log debug "SOLICIT from peer: $args"
     164        variable cid
     165        log debug "solicitation request from peer: $args"
     166        eval Solicitation ::#auto $args -connection $cid
     167        return ""
     168    }
     169
     170    # ------------------------------------------------------------------
     171    #  INCOMING: proffer <token> <details>
     172    #  Workers send this message back after a "solicit" request with
     173    #  details about what they can offer in terms of CPU power.  When
     174    #  a worker has received all replies from its peers, it sends back
     175    #  its own proffer message to the client that started the
     176    #  solicitation.
     177    # ------------------------------------------------------------------
     178    define proffer {token details} {
     179log debug "PROFFER from peer: $token $details"
     180        Solicitation::proffer $token $details
     181        return ""
     182    }
     183}
     184
     185# ======================================================================
     186#  PROTOCOL: hubzero:workerc<-workers/1
     187#
     188#  The following messages are received by a worker client in response
     189#  to the requests that they send to a worker server.
     190# ======================================================================
     191p2p::protocol::register hubzero:workerc<-workers/1 {
     192    # ------------------------------------------------------------------
     193    #  INCOMING: identity
     194    #  Used for debugging, so that each client can identify itself by
     195    #  name to this worker.
     196    # ------------------------------------------------------------------
     197    define identity {name} {
     198        variable cid
     199        variable handler
     200        $handler connectionName $cid $name
     201        return ""
     202    }
     203
     204    # ------------------------------------------------------------------
     205    #  INCOMING: pong
     206    #  When forming the peer-to-peer network, workers send ping/pong
     207    #  messages to one another to measure the latency.
     208    # ------------------------------------------------------------------
     209    define pong {} {
     210        global peers
     211        variable handler
     212        set now [clock clicks -milliseconds]
     213        set delay [expr {$now - $peers(ping-$handler-start)}]
     214        set peers(ping-$handler-latency) $delay
     215        if {[incr peers(responses)] >= [llength $peers(testing)]} {
     216            after cancel {peer-network goto finalize}
     217            after idle {peer-network goto finalize}
     218        }
     219        return ""
     220    }
     221
     222    # ------------------------------------------------------------------
     223    #  INCOMING: solicit -job info -path hosts -token xxx
     224    #  Workers send this message on to their peers to solicit bids
     225    #  for a simulation job.
     226    # ------------------------------------------------------------------
     227    define solicit {args} {
     228log debug "SOLICIT from peer: $args"
     229        variable cid
     230        log debug "solicitation request from peer: $args"
     231        eval Solicitation ::#auto $args -connection $cid
     232        return ""
     233    }
     234
     235    # ------------------------------------------------------------------
     236    #  INCOMING: proffer <token> <details>
     237    #  Workers send this message back after a "solicit" request with
     238    #  details about what they can offer in terms of CPU power.  When
     239    #  a worker has received all replies from its peers, it sends back
     240    #  its own proffer message to the client that started the
     241    #  solicitation.
     242    # ------------------------------------------------------------------
     243    define proffer {token details} {
     244log debug "PROFFER from peer: $token $details"
     245        Solicitation::proffer $token $details
     246        return ""
     247    }
     248}
     249
     250# ----------------------------------------------------------------------
     251#  COMMAND:  broadcast_to_peers <message> ?<avoidList>?
     252#
     253#  Used to broadcast a message out to all peers.  The <message> must
     254#  be one of the commands defined in the worker protocol.  If there
     255#  are no peer connections yet, this command does nothing.  If any
     256#  peer appears on the <avoidList>, it is skipped.  Returns the
     257#  number of messages sent out, so this peer knows how many replies
     258#  to wait for.
     259# ----------------------------------------------------------------------
     260proc broadcast_to_peers {message {avoidList ""}} {
     261    global server peers
     262
     263    #
     264    # Build a list of all peer addresses, so we know who we're
     265    # going to send this message out to.
     266    #
     267    set recipients ""
     268    foreach key [array names peers current-*] {
     269        set addr [$peers($key) address]
     270        if {[lsearch $avoidList $addr] < 0} {
     271            lappend recipients $addr
     272        }
     273    }
     274    foreach cid [$server connections hubzero:workers<-workerc/1] {
     275        set addr [lindex [$server connectionName $cid] 0]  ;# x.x.x.x (sockN)
     276        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
     277            lappend recipients $addr
     278        }
     279    }
     280
     281    #
     282    # If the message has any @RECIPIENTS fields, replace them
     283    # with the list of recipients
     284    #
     285    regsub -all @RECIPIENTS $message $recipients message
     286
     287    #
     288    # Send the message out to all peers.  Keep a count and double-check
     289    # it against the list of recipients generated above.
     290    #
     291    set nmesgs 0
     292
     293    # send off to other workers that this one has connected to
     294    foreach key [array names peers current-*] {
     295        set addr [$peers($key) address]
     296        if {[lsearch $avoidList $addr] < 0} {
     297            if {[catch {$peers($key) send $message} err] == 0} {
     298                incr nmesgs
     299            } else {
     300                log error "ERROR: broadcast failed to [$peers($key) address]: $result\n  (message was \"$message\")"
     301            }
     302        }
     303    }
     304
     305    # send off to other workers that connected to this one
     306    foreach cid [$server connections hubzero:workers<-workerc/1] {
     307        set addr [lindex [$server connectionName $cid] 0]  ;# x.x.x.x (sockN)
     308        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
     309            if {[catch {puts $cid $message} result] == 0} {
     310                incr nmesgs
     311            } else {
     312                log error "ERROR: broadcast failed for $cid: $result"
     313                log error "  (message was $message)"
     314            }
     315        }
     316    }
     317
     318    # did we send the right number of messages?
     319    if {[llength $recipients] != $nmesgs} {
     320        log error "ERROR: sent only $nmesgs messages to peers {$recipients}"
     321    }
     322
     323    return $nmesgs
     324}
     325
     326# ----------------------------------------------------------------------
     327#  COMMAND:  worker_got_protocol
     328#
     329#  Invoked whenever a peer sends their protocol message to this
     330#  worker.  Sends the same protocol name back, so the other worker
     331#  understands what protocol we're speaking.
     332# ----------------------------------------------------------------------
     333proc worker_got_protocol {cid protocol} {
     334    switch -glob -- $protocol {
     335        *<-worker* {
     336            puts $cid [list protocol hubzero:workerc<-workers/1]
     337        }
     338        *<-foreman* {
     339            puts $cid [list protocol hubzero:foreman<-worker/1]
     340        }
     341        DEF* {
     342            # do nothing
     343        }
     344        default {
     345            error "don't recognize protocol \"$protocol\""
     346        }
     347    }
     348}
     349
     350# ----------------------------------------------------------------------
     351#  COMMAND:  worker_got_dropped <address>
     352#
     353#  Invoked whenever an inbound connection to this worker drops.
     354#  At some point we should try updating our connections to other
     355#  peers, to replace this missing connection.
     356# ----------------------------------------------------------------------
     357proc worker_got_dropped {addr} {
     358    global peers
     359    if {[info exists peers(current-$addr)]} {
     360        unset peers(current-$addr)
     361        after cancel {peer-network goto measure}
     362        after 5000 {peer-network goto measure}
     363log debug "peer dropped: $addr"
     364    }
     365}
     366
     367# ======================================================================
     368#  Connect to one of the authorities to get a list of peers.  Then,
     369#  sit in the event loop and process events.
     370# ======================================================================
     371p2p::wonks::init
     372
     373set server [p2p::server -port 9101? \
     374        -protocols {hubzero:workers<-workerc hubzero:worker<-foreman} \
     375        -servername worker \
     376        -onprotocol worker_got_protocol \
     377        -ondisconnect worker_got_dropped]
     378
     379# ----------------------------------------------------------------------
     380#  AUTHORITY CONNECTION
     381#
     382#  This is the state machine representing the connection to the
     383#  authority server.  We start in the "idle" state.  Whenever we
     384#  try to move to "connected", we'll open a connection to an authority
     385#  and request updated information on workers.  If that connection
     386#  fails, we'll get kicked back to "idle" and we'll try again later.
     387# ----------------------------------------------------------------------
     388StateMachine authority-connection
     389authority-connection statedata cnx ""
     390
     391# sit here whenever we don't have a connection
     392# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     393authority-connection state idle -onenter {
     394    # have an open connection? then close it
     395    if {"" != [statedata cnx]} {
     396        catch {itcl::delete object [statedata cnx]}
     397        statedata cnx ""
     398    }
     399    # try to connect again later
     400    set delay [p2p::options get time_between_authority_checks]
     401    after $delay {authority-connection goto connected}
     402} -onleave {
     403    after cancel {authority-connection goto connected}
     404}
     405
     406# sit here after we're connected and waiting for a response
     407# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     408authority-connection state connected
     409
     410# when moving to the connected state, make a connection
     411# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     412authority-connection transition idle->connected -onchange {
     413    global server
    203414
    204415    # connect to the authority and request a list of peers
    205     if {[catch {
    206         set client [authority_connect]
    207         $client send "listening [$server port]"
    208         $client send "peers"
    209     } result]} {
    210         log system "ERROR: $result"
    211     }
    212 
    213     # register again at regular intervals in case the authority
    214     # gets restarted in between.
    215     after $options(time_between_registers) worker_register
    216 }
    217 
    218 # ----------------------------------------------------------------------
    219 #  COMMAND:  worker_load_check
    220 #
    221 #  Invoked when this worker first starts up and periodically thereafter
    222 #  to compute the system load available to the worker.  If the system
    223 #  load has changed significantly, then the "perftest" program is
    224 #  executed to get a measure of performance available.  This program
    225 #  returns a number of "wonks" which we report to peers as our available
    226 #  performance.
    227 # ----------------------------------------------------------------------
    228 proc worker_load_check {} {
    229     global options sysload dir
    230 
    231     # see if the load has changed significantly
    232     set changed 0
    233     if {$sysload(lastLoad) < 0} {
    234         set changed 1
     416    statedata cnx ""
     417    foreach addr [randomize [p2p::options get authority_hosts]] {
     418        if {[catch {p2p::client -address $addr \
     419            -sendprotocol hubzero:authority<-worker/1 \
     420            -receiveprotocol hubzero:worker<-authority/1} result] == 0} {
     421            statedata cnx $result
     422            break
     423        }
     424    }
     425
     426    if {"" != [statedata cnx]} {
     427        [statedata cnx] send "listening [$server port]"
     428        [statedata cnx] send "peers"
    235429    } else {
    236         set load [Rappture::sysinfo load5]
    237         if {$load < 0.9*$sysload(lastLoad) || $load > 1.1*$sysload(lastLoad)} {
    238             set changed 1
    239         }
    240     }
    241 
    242     if {$changed} {
    243         set sysload(lastLoad) [Rappture::sysinfo load5]
    244 puts "LOAD CHANGED: $sysload(lastLoad) [Rappture::sysinfo freeram freeswap]"
    245         set sysload(measured) [clock seconds]
    246 
    247         # Run the program, but don't use exec, since it blocks.
    248         # Instead, run it in the background and harvest its output later.
    249         set sysload(test) [open "| [file join $dir perftest]" r]
    250         fileevent $sysload(test) readable worker_load_results
    251     }
    252 
    253     # monitor the system load at regular intervals
    254     after $options(time_between_load_checks) worker_load_check
    255 }
    256 
    257 # ----------------------------------------------------------------------
    258 #  COMMAND:  worker_load_results
    259 #
    260 #  Invoked automatically when the "perftest" run finishes.  Reads the
    261 #  number of wonks reported on standard output and reports them to
    262 #  other peers.
    263 # ----------------------------------------------------------------------
    264 proc worker_load_results {} {
    265     global sysload
    266 
    267     if {[catch {read $sysload(test)} msg] == 0} {
    268         if {[regexp {[0-9]+} $msg match]} {
    269 puts "WONKS: $match"
    270             set sysload(wonks) $match
    271         } else {
    272             log system "ERROR: performance test failed: $msg"
    273         }
    274     }
    275     catch {close $sysload(test)}
    276     set sysload(test) ""
    277 }
    278 
    279 # ----------------------------------------------------------------------
    280 #  COMMAND:  worker_peers_update
    281 #
    282 #  Invoked when this worker has received a list of peers, and from
    283 #  time to time thereafter, to establish connections with other peers
    284 #  in the network.  This worker picks a number of peers randomly from
    285 #  the list of all available peers, and then pings each of them.
    286 #  Timing results from the pings are stored away, and when all pings
    287 #  have completed, this worker picks the best few connections and
    288 #  keeps those.
    289 # ----------------------------------------------------------------------
    290 proc worker_peers_update {} {
    291     global options peers myaddress
    292     worker_peers_cleanup
     430        error "can't connect to any authority\nAUTHORITY LIST: [p2p::options get authority_hosts]"
     431    }
     432}
     433
     434# ----------------------------------------------------------------------
     435#  PEER-TO-PEER CONNECTIONS
     436#
     437#  This is the state machine representing the network of worker peers.
     438#  We start in the "idle" state.  From time to time we move to the
     439#  "measure" state and attempt to establish connections with a set
     440#  peers.  We then wait for ping/pong responses and move to "finalize".
     441#  When we have all responses, we move to "finalize" and finalize all
     442#  connections, and then move back to "idle".
     443# ----------------------------------------------------------------------
     444StateMachine peer-network
     445peer-network statedata cnx ""
     446
     447# sit here when the peer network is okay
     448# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     449peer-network state idle -onenter {
     450    # try to connect again later
     451    set delay [p2p::options get time_between_network_rebuilds]
     452    after $delay {peer-network goto measure}
     453} -onleave {
     454    after cancel {peer-network goto measure}
     455}
     456
     457# sit here when we need to rebuild the peer-to-peer network
     458# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     459peer-network state measure
     460
     461# when moving to the start state, make connections to a bunch of peers
     462# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     463peer-network transition idle->measure -onchange {
     464    global server myaddress peers
     465
     466    #
     467    # Get a list of workers already connected to this one
     468    # with an inbound connection to the server.
     469    #
     470    foreach cid [$server connections hubzero:workers<-workerc/1] {
     471        set addr [lindex [$server connectionName $cid] 0]
     472        set inbound($addr) $cid
     473    }
    293474
    294475    #
    295476    # Pick a random group of peers and try to connect to them.
    296     # Start with the existing peers, and then add a random
    297     # bunch of others.
    298     #
     477    # Start with the existing peers to see if their connection
     478    # is still favorable, and then add a random bunch of others.
     479    #
     480    set peers(testing) ""
     481    set peers(responses) 0
    299482    foreach key [array names peers current-*] {
    300483        set peer $peers($key)
     
    307490    # final number we want to talk to.
    308491    #
    309     set ntest [expr {10 * $options(max_peer_connections)}]
    310     foreach addr [randomize $peers(all) $ntest] {
    311         if {$addr == $myaddress} {
     492    set maxpeers [p2p::options get max_peer_connections]
     493    foreach addr [randomize $peers(all) [expr {10*$maxpeers}]] {
     494        #
     495        # Avoid connecting to ourself, or to peers that we're
     496        # already connected to (either as inbound connections
     497        # to the server or as outbound connections to others).
     498        #
     499        if {$addr == $myaddress
     500              || [info exists peers(current-$addr)]
     501              || [info exists inbound($addr)]} {
    312502            continue
    313503        }
    314         if {![info exists peers(current-$addr)]
    315               && [catch {Client ::#auto $addr} peer] == 0} {
    316             # open a new connection to this address
    317             lappend peers(testing) $peer
    318             $peer protocol hubzero:peer/1
    319 
    320             $peer define hubzero:peer/1 exception {message} {
    321                 log system "ERROR: $message"
    322             }
    323 
    324             $peer define hubzero:peer/1 identity {name} {
    325                 variable cid
    326                 variable handler
    327                 $handler connectionName $cid $name
    328                 return ""
    329             }
    330 
    331             # when we get a "pong" back, store the latency
    332             $peer define hubzero:peer/1 pong {} {
    333                 global peers
    334                 variable handler
    335 puts "  pong from $handler"
    336                 set now [clock clicks -milliseconds]
    337                 set delay [expr {$now - $peers(ping-$handler-start)}]
    338                 set peers(ping-$handler-latency) $delay
    339                 incr peers(responses)
    340                 worker_peers_finalize
    341                 return ""
    342             }
    343 
    344             # start the ping/pong session with the peer
    345             $peer send "protocol hubzero:peer/1"
    346 
    347             # send tell this peer our name (for debugging)
    348             if {[log channel debug]} {
    349                 $peer send "identity $myaddress"
    350             }
     504
     505        if {[catch {p2p::client -address $addr \
     506            -sendprotocol hubzero:workers<-workerc/1 \
     507            -receiveprotocol hubzero:workerc<-workers/1} cnx]} {
     508            continue
     509        }
     510        $cnx send "identity $myaddress"
     511        lappend peers(testing) $cnx
     512
     513        # have enough connections to test?
     514        if {[llength $peers(testing)] >= 2*$maxpeers} {
     515            break
    351516        }
    352517    }
     
    355520    # Now, loop through all peers and send a "ping" message.
    356521    #
    357     foreach peer $peers(testing) {
    358 puts "pinging $peer = [$peer address]..."
     522    foreach cnx $peers(testing) {
    359523        # mark the time and send a ping to this peer
    360         set peers(ping-$peer-start) [clock clicks -milliseconds]
    361         $peer send "ping"
     524        set peers(ping-$cnx-start) [clock clicks -milliseconds]
     525        $cnx send "ping"
    362526    }
    363527
    364528    # if this test takes too long, just give up
    365     after 10000 worker_peers_finalize -force
    366 }
    367 
    368 # ----------------------------------------------------------------------
    369 #  COMMAND:  worker_peers_finalize ?-force?
    370 #
    371 #  Called after worker_peers_update has finished its business to
    372 #  clean up all of the connections that were open for testing.
    373 # ----------------------------------------------------------------------
    374 proc worker_peers_finalize {{option -check}} {
    375     global peers options
    376     if {$option == "-force" || $peers(responses) == [llength $peers(testing)]} {
    377         # build a list:  {peer latency} {peer latency} ...
    378         set plist ""
    379         foreach obj $peers(testing) {
    380             if {[info exists peers(ping-$obj-latency)]} {
    381                 lappend plist [list $obj $peers(ping-$obj-latency)]
    382             }
    383         }
    384 puts "-------------------\nFINALIZING $::myaddress"
    385 puts "PINGS: $plist"
    386 
    387         # sort the list and extract the top peers
    388         set plist [lsort -increasing -index 1 $plist]
    389         set plist [lrange $plist 0 [expr {$options(max_peer_connections)-1}]]
    390 puts "TOP: $plist"
    391 
    392         set current [array names peers current-*]
    393 puts "  current: $current"
    394 
    395         # transfer the top peers to the "current" list
    396         set all ""
    397         foreach rec $plist {
    398             set peer [lindex $rec 0]
    399             set addr [$peer address]
    400             set peers(current-$addr) $peer
    401             lappend all $addr
    402 
    403             # if it is already on the "current" list, then keep it
    404             set i [lsearch $current current-$addr]
    405             if {$i >= 0} {
    406                 set current [lreplace $current $i $i]
    407             }
    408             set i [lsearch $peers(testing) $peer]
    409             if {$i >= 0} {
    410                 set peers(testing) [lreplace $peers(testing) $i $i]
    411             }
    412         }
    413         log system "connected to peers: $all"
    414 puts "  new current: [array names peers current-*]"
    415 puts "  final: $all"
    416 puts "  leftover: $current $peers(testing)"
    417 
    418         # get rid of old peers that we no longer want to talk to
    419         foreach leftover $current {
    420             itcl::delete object $peers($leftover)
    421             unset peers($leftover)
    422 puts "  cleaned up $leftover (was on current list)"
    423         }
    424 
    425         # clean up after this test
    426         worker_peers_cleanup
    427     }
    428 }
    429 
    430 # ----------------------------------------------------------------------
    431 #  COMMAND:  worker_peers_cleanup
    432 #
    433 #  Called after worker_peers_update has finished its business to
    434 #  clean up all of the connections that were open for testing.
    435 # ----------------------------------------------------------------------
    436 proc worker_peers_cleanup {} {
     529    after 10000 {peer-network goto finalize}
     530}
     531
     532# sit here when we need to rebuild the peer-to-peer network
     533# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     534peer-network state finalize -onenter {
     535    after cancel {peer-network goto finalize}
     536
     537    # everything is finalized now, so go to idle
     538    after idle {peer-network goto idle}
     539}
     540
     541# when moving to the finalize state, decide on the network of peers
     542# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     543peer-network transition measure->finalize -onchange {
    437544    global peers
     545
     546    set maxpeers [p2p::options get max_peer_connections]
     547
     548    # build a list:  {peer latency} {peer latency} ...
     549    set plist ""
     550    foreach obj $peers(testing) {
     551        if {[info exists peers(ping-$obj-latency)]} {
     552            lappend plist [list $obj $peers(ping-$obj-latency)]
     553        }
     554    }
     555
     556    # sort the list and extract the top peers
     557    set plist [lsort -increasing -index 1 $plist]
     558    set plist [lrange $plist 0 [expr {$maxpeers-1}]]
     559
     560    set currentpeers [array names peers current-*]
     561
     562    # transfer the top peers to the "current" list
     563    set all ""
     564    foreach rec $plist {
     565        set peer [lindex $rec 0]
     566        set addr [$peer address]
     567        set peers(current-$addr) $peer
     568        lappend all $addr
     569
     570        # if it is already on the "current" list, then keep it
     571        set i [lsearch $currentpeers current-$addr]
     572        if {$i >= 0} {
     573            set currentpeers [lreplace $currentpeers $i $i]
     574        }
     575        set i [lsearch $peers(testing) $peer]
     576        if {$i >= 0} {
     577            set peers(testing) [lreplace $peers(testing) $i $i]
     578        }
     579    }
     580    log system "connected to peers: $all"
     581
     582    # get rid of old peers that we no longer want to talk to
     583    foreach leftover $currentpeers {
     584        itcl::delete object $peers($leftover)
     585        unset peers($leftover)
     586    }
     587
     588    # clean up after this test
    438589    foreach obj $peers(testing) {
    439590        catch {itcl::delete object $obj}
    440 puts "  cleaned up $obj (was on testing list)"
    441591    }
    442592    set peers(testing) ""
     
    446596    }
    447597    set peers(responses) 0
    448 
    449     after cancel worker_peers_finalize -force
    450 }
    451 
    452 # ----------------------------------------------------------------------
    453 #  COMMAND:  worker_peers_protocol
    454 #
    455 #  Invoked whenever a peer sends their protocol message to this
    456 #  worker.  Sends the same protocol name back, so the other worker
    457 #  understands what protocol we're speaking.
    458 # ----------------------------------------------------------------------
    459 proc worker_peers_protocol {cid protocol} {
    460     if {"DEFAULT" != $protocol} {
    461         puts $cid [list protocol $protocol]
    462     }
    463 }
    464 
    465 # ======================================================================
    466 #  Connect to one of the authorities to get a list of peers.  Then,
    467 #  sit in the event loop and process events.
    468 # ======================================================================
    469 after idle worker_register
     598}
     599
     600# ----------------------------------------------------------------------
     601#  JOB SOLICITATION
     602#
     603#  Each worker can receive a "solicit" request, asking for information
     604#  about performance and price of peers available to work.  Each worker
     605#  sends the request on to its peers, then gathers the information
     606#  and sends it back to the client or to the peer requesting the
     607#  information.  There can be multiple requests going on at once,
     608#  and each may have different job types and return different info,
     609#  so the class below helps to watch over each request until it has
     610#  completed.
     611# ----------------------------------------------------------------------
     612itcl::class Solicitation {
     613    public variable connection ""
     614    public variable job ""
     615    public variable path ""
     616    public variable avoid ""
     617    public variable token ""
     618
     619    private variable _serial ""
     620    private variable _response ""
     621    private variable _waitfor 0
     622    private variable _timeout ""
     623
     624    constructor {args} {
     625        eval configure $args
     626
     627        global myaddress
     628        lappend path $myaddress
     629        lappend avoid $myaddress
     630        set _serial [incr counter]
     631        set all($_serial) $this
     632
     633        set delay "idle"  ;# finalize after waiting for responses
     634        set ttl [p2p::options get peer_time_to_live]
     635        if {[llength $path] < $ttl} {
     636            set mesg [list solicit -job $job -path $path -avoid "$avoid @RECIPIENTS" -token $_serial]
     637            set _waitfor [broadcast_to_peers $mesg $avoid]
     638log debug "WAIT FOR: $_waitfor"
     639
     640            if {$_waitfor > 0} {
     641                # add a delay proportional to ttl + time for wonks measurement
     642                set delay [expr {($ttl-[llength $path]-1)*1000 + 3000}]
     643            }
     644        }
     645log debug "TIMEOUT: $delay"
     646        set _timeout [after $delay [itcl::code $this finalize]]
     647    }
     648    destructor {
     649        after cancel $_timeout
     650        catch {unset all($_serial)}
     651    }
     652
     653    # this adds the info from each proffer to this solicitation
     654    method response {details} {
     655        set addr [lindex $details 0]
     656        append _response $details "\n"
     657        if {[incr _waitfor -1] <= 0} {
     658log debug "ALL RESPONSES"
     659            finalize
     660        }
     661    }
     662
     663    # called to finalize when all peers have responded back
     664    method finalize {} {
     665        global myaddress
     666
     667        # filter out duplicate info from clients
     668        set block ""
     669        foreach line [split $_response \n] {
     670            set addr [lindex $line 0]
     671            if {"" != $addr && ![info exists response($addr)]} {
     672                append block $line "\n"
     673                set response($addr) 1
     674            }
     675        }
     676        # add details about this client to the message
     677        append block "$myaddress -job $job -cost 1 -wonks [p2p::wonks::current]"
     678log debug "FINALIZE {$block}"
     679
     680        # send the composite results back to the caller
     681        set cmd [list proffer $token $block]
     682        if {[catch {puts $connection $cmd} err]} {
     683            log error "ERROR while sending back proffer: $err"
     684        }
     685        itcl::delete object $this
     686    }
     687
     688    proc proffer {serial details} {
     689        if {[info exists all($serial)]} {
     690            $all($serial) response $details
     691        }
     692    }
     693
     694    common counter 0 ;# generates serial nums for objects
     695    common all       ;# maps serial num => solicitation object
     696}
     697
     698# ----------------------------------------------------------------------
    470699log system "starting..."
     700
     701after idle {
     702    authority-connection goto connected
     703}
     704
    471705vwait main-loop
Note: See TracChangeset for help on using the changeset viewer.