Changeset 1273


Ignore:
Timestamp:
Feb 5, 2009 6:17:23 AM (11 years ago)
Author:
mmc
Message:

Major reorganization of p2p code, and support for solicit/proffer
messages.

Location:
trunk/p2p
Files:
5 added
10 edited

Legend:

Unmodified
Added
Removed
  • trunk/p2p/Makefile

    r1257 r1273  
    44all:
    55        $(CC) $(CFLAGS) perftest.c -o perftest
    6         ./mkindex handler.tcl server.tcl client.tcl log.tcl random.tcl
     6        ./mkindex handler.tcl server.tcl client.tcl log.tcl random.tcl \
     7                options.tcl protocols.tcl statemachine.tcl wonks.tcl \
     8                foreman.tcl
  • trunk/p2p/authority.tcl

    r1257 r1273  
    3131
    3232# register with the central authority at this frequency
    33 set worker_options(time_between_registers) 60000
     33set worker_options(time_between_authority_checks) 60000
    3434
    3535# workers should try to connect with this many other peers
     
    3737
    3838# ======================================================================
     39#  PROTOCOL: hubzero:authority<-worker/1
     40#
     41#  The worker initiates communication with the authority by sending
     42#  these messages to the authority to request information about
     43#  other peers.
     44# ======================================================================
     45p2p::protocol::register hubzero:authority<-worker/1 {
     46
     47    # ------------------------------------------------------------------
     48    #  INCOMING: listening <port>
     49    #  Workers use this to tell the authority what port they're
     50    #  listening on.  Other workers should connect to this port.
     51    # ------------------------------------------------------------------
     52    define listening {port} {
     53        global client2address workers
     54        variable handler
     55        variable cid
     56
     57        # register this address on the list of workers
     58        set addr "$client2address($cid):$port"
     59        set workers($addr) "--"
     60
     61        # name this connection for easier debugging
     62        $handler connectionName $cid $addr
     63
     64        return ""
     65    }
     66
     67    # ------------------------------------------------------------------
     68    #  INCOMING: peers
     69    # ------------------------------------------------------------------
     70    define peers {} {
     71        global workers
     72        return [list peers [array names workers]]
     73    }
     74}
     75
     76# ======================================================================
     77#  PROTOCOL: hubzero:authority<-foreman/1
     78#
     79#  The foreman initiates communication with the authority by sending
     80#  these messages to request information about workers, to estimate
     81#  requirements for a job, to put funds in escrow, and to finalize
     82#  the transaction.
     83# ======================================================================
     84p2p::protocol::register hubzero:authority<-foreman/1 {
     85    # ------------------------------------------------------------------
     86    #  INCOMING: workers
     87    #  Foremen use this to request the list of known workers, and then
     88    #  pick one worker to act as their connection to the P2P network.
     89    # ------------------------------------------------------------------
     90    define workers {} {
     91        global workers
     92        return [list workers [array names workers]]
     93    }
     94}
     95
     96# ======================================================================
    3997# set up a server at the first open port above 9001
    40 set server [Server ::#auto 9001? \
     98set server [p2p::server \
     99    -port 9001? \
     100    -protocols {hubzero:authority<-worker hubzero:authority<-foreman} \
    41101    -servername authority \
    42102    -onconnect authority_client_address \
     
    66126proc authority_client_protocol {cid protocol} {
    67127    global worker_options client2address
    68     if {"DEFAULT" != $protocol} {
    69         puts $cid [list protocol $protocol]
    70         puts $cid "options [array get worker_options] ip $client2address($cid)"
     128    switch -glob -- $protocol {
     129        *<-worker* {
     130            puts $cid [list protocol hubzero:worker<-authority/1]
     131            puts $cid "options [array get worker_options] ip $client2address($cid)"
     132        }
     133        *<-foreman* {
     134            puts $cid [list protocol hubzero:foreman<-authority/1]
     135        }
     136        DEF* {
     137            # do nothing
     138        }
     139        default {
     140            error "don't recognize protocol \"$protocol\""
     141        }
    71142    }
    72143}
    73144
    74 # ======================================================================
    75 #  PROTOCOL: hub:worker/1
    76 # ======================================================================
    77 $server protocol hubzero:worker/1
    78 
    79 # ----------------------------------------------------------------------
    80 #  DIRECTIVE: exception <message>
    81 # ----------------------------------------------------------------------
    82 $server define hubzero:worker/1 exception {args} {
    83     log system "error from worker: $args"
    84 }
    85 
    86 # ----------------------------------------------------------------------
    87 #  DIRECTIVE: listening <port>
    88 #  Workers use this to tell the authority what port they're listening
    89 #  on.  Other workers should connect to this port.
    90 # ----------------------------------------------------------------------
    91 $server define hubzero:worker/1 listening {port} {
    92     global client2address workers
    93     variable handler
    94     variable cid
    95 
    96     # register this address on the list of workers
    97     set addr "$client2address($cid):$port"
    98     set workers($addr) "--"
    99 
    100     # name this connection for easier debugging
    101     $handler connectionName $cid $addr
    102 
    103     return ""
    104 }
    105 
    106 # ----------------------------------------------------------------------
    107 #  DIRECTIVE: peers
    108 # ----------------------------------------------------------------------
    109 $server define hubzero:worker/1 peers {} {
    110     global workers
    111     return [list peers [array names workers]]
    112 }
    113 
    114145vwait main-loop
  • trunk/p2p/client.tcl

    r1257 r1273  
    1111# ======================================================================
    1212package require Itcl
     13
     14namespace eval p2p { # forward declaration }
     15
     16# ======================================================================
     17#  USAGE: p2p::client ?-option value -option value ...?
     18#
     19#  Used to create a client connection to a peer-to-peer server.
     20#  Recognizes the following options:
     21#    -address ........... connect to server at this host:port
     22#    -sendprotocol ...... tell server we're speaking this protocol
     23#    -receiveprotocol ... server replies back with these commands
     24# ======================================================================
     25proc p2p::client {args} {
     26    array set options {
     27        -address ?
     28        -sendprotocol ""
     29        -receiveprotocol ""
     30    }
     31    foreach {key val} $args {
     32        if {![info exists options($key)]} {
     33            error "bad option \"$key\": should be [join [lsort [array names options]] {, }]"
     34        }
     35        set options($key) $val
     36    }
     37
     38    # create the client
     39    set client [eval Client ::#auto $options(-address)]
     40
     41    # install the protocol for incoming commands
     42    p2p::protocol::init $client $options(-receiveprotocol)
     43
     44    # tell the server what protocol we'll be speaking
     45    $client send [list protocol $options(-sendprotocol)]
     46
     47    return $client
     48}
     49
     50# ======================================================================
     51#  CLASS: Client
     52# ======================================================================
    1353
    1454itcl::class Client {
  • trunk/p2p/handler.tcl

    r1257 r1273  
    3434            return ""
    3535        }
     36        define DEFAULT exception {message} {
     37            log error "ERROR: $message"
     38        }
    3639
    3740        eval configure $args
     
    4952    public method protocol {name}
    5053    public method define {protocol name arglist body}
     54    public method connections {{protocol *}}
    5155    public method connectionName {cid {name ""}}
    5256    public method connectionSpeaks {cid protocol}
     
    5458    protected method handle {cid}
    5559    protected method finalize {protocol}
     60    protected method dropped {cid}
    5661    protected method handlerType {}
    5762}
     
    101106
    102107# ----------------------------------------------------------------------
     108#  USAGE: connections ?<protocol>?
     109#
     110#  Returns a list of file handles for current connections that match
     111#  the glob-style <protocol> pattern.  If no pattern is given, then
     112#  it returns all connections.  Each handle can be passed to
     113#  connectionName and connectionSpeaks to get more information.
     114# ----------------------------------------------------------------------
     115itcl::body Handler::connections {{pattern *}} {
     116    set rlist ""
     117    foreach cid [array names _protocol] {
     118        if {[string match $pattern $_protocol($cid)]} {
     119            lappend rlist $cid
     120        }
     121    }
     122    return $rlist
     123}
     124
     125# ----------------------------------------------------------------------
    103126#  USAGE: connectionName <sockId> ?<name>?
    104127#
     
    144167itcl::body Handler::handle {cid} {
    145168    if {[gets $cid request] < 0} {
    146         log system "dropped: [connectionName $cid]"
    147         # connection has connection -- forget this entity
    148         catch {close $cid}
    149         catch {unset _buffer($cid)}
    150         catch {unset _protocol($cid)}
    151         catch {unset _cname($cid)}
     169        dropped $cid
    152170    } elseif {[info exists _protocol($cid)]} {
    153171        # complete command? then process it below...
     
    171189            if {[catch {$_parser($protocol) eval $request} result] == 0} {
    172190                if {[string length $result] > 0} {
    173                     puts $cid $result
     191                    catch {puts $cid $result}
    174192                    append mesg "ok: $result"
    175193                }
    176194            } else {
    177                 puts $cid [list exception $result]
     195                catch {puts $cid [list exception $result]}
    178196                append mesg "exception: $result"
    179197            }
     
    204222
    205223# ----------------------------------------------------------------------
     224#  USAGE: dropped <cid>
     225#
     226#  Invoked automatically whenever a client connection drops, to
     227#  log the event and remove all trace of the client.  Derived classes
     228#  can override this method to perform other functions too.
     229# ----------------------------------------------------------------------
     230itcl::body Handler::dropped {cid} {
     231    log system "dropped: [connectionName $cid]"
     232
     233    # connection has connection -- forget this entity
     234    catch {close $cid}
     235    catch {unset _buffer($cid)}
     236    catch {unset _protocol($cid)}
     237    catch {unset _cname($cid)}
     238}
     239
     240# ----------------------------------------------------------------------
    206241#  USAGE: handlerType
    207242#
  • trunk/p2p/perftest.c

    r1257 r1273  
    9191             + tend.tv_usec - tstart.tv_usec;
    9292
    93     printf("%ld\n", tval);
     93    printf("rappture-wonks/v1 %ld\n", tval);
    9494    exit(0);
    9595}
  • trunk/p2p/random.tcl

    r1257 r1273  
    3838    return $rlist
    3939}
     40
     41# ----------------------------------------------------------------------
     42#  USAGE: random <list>
     43#
     44#  Picks one element at random from the given <list>.
     45# ----------------------------------------------------------------------
     46proc random {entries} {
     47    set nrand [expr {int(rand()*[llength $entries])}]
     48    return [lindex $entries $nrand]
     49}
  • trunk/p2p/server.tcl

    r1257 r1273  
    1111package require Itcl
    1212
     13namespace eval p2p { # forward declaration }
     14
     15# ======================================================================
     16#  USAGE: p2p::server ?-option value -option value ...?
     17#
     18#  Used to create a new peer-to-peer server object for this program.
     19#  Recognizes the following options:
     20#    -port ........ port number that the server listens on
     21#    -protocols ... list of protocol root names that the server handles
     22#               ... any other option supported by Server class
     23# ======================================================================
     24proc p2p::server {args} {
     25    set port "?"
     26    set protocols ""
     27    set options ""
     28    foreach {key val} $args {
     29        switch -- $key {
     30            -port { set port $val }
     31            -protocols { set protocols $val }
     32            default { lappend options $key $val }
     33        }
     34    }
     35
     36    if {[llength $protocols] == 0} {
     37        error "server needs at least one value for -protocols"
     38    }
     39
     40    # create the server
     41    set server [eval Server ::#auto $port $options]
     42
     43    # install the protocols that this server recognizes
     44    foreach name $protocols {
     45        p2p::protocol::init $server $name
     46    }
     47    return $server
     48}
     49
     50# ======================================================================
     51#  CLASS: Server
     52# ======================================================================
    1353itcl::class Server {
    1454    inherit Handler
     
    2464    # this code fragment gets invoked when client declares the protocol
    2565    public variable onprotocol ""
     66
     67    # this code fragment gets invoked when client drops
     68    public variable ondisconnect ""
    2669
    2770    constructor {port args} {
     
    59102
    60103    public method port {}
     104    public method broadcast {args}
    61105    public method connectionSpeaks {cid protocol}
    62106
     107    protected method dropped {cid}
    63108    protected method handlerType {}
    64109
     
    80125
    81126# ----------------------------------------------------------------------
     127#  USAGE: broadcast ?-protocol <name>? ?-avoid <avoidList>? <message>
     128#
     129#  Sends a <message> to all clients connected to this server.  If a
     130#  client address appears on the -avoid list, then that client is
     131#  avoided.  If the -protocol is specified, then the message is sent
     132#  only to clients who match the glob-style pattern for the protocol
     133#  name.
     134# ----------------------------------------------------------------------
     135itcl::body Server::broadcast {args} {
     136    set pattern "*"
     137    set avoidList ""
     138    set i 0
     139    while {$i < [llength $args]} {
     140        set option [lindex $args $i]
     141        if {[string index $option 0] == "-"} {
     142            switch -- $option {
     143                -protocol {
     144                    set pattern [lindex $args [expr {$i+1}]]
     145                    incr i 2
     146                }
     147                -avoid {
     148                    set avoidList [lindex $args [expr {$i+1}]]
     149                    incr i 2
     150                }
     151                -- {
     152                    incr i
     153                    break
     154                }
     155                default {
     156                    error "bad option \"$option\": should be -avoid, -protocol, or --"
     157                }
     158            }
     159        } else {
     160            break
     161        }
     162    }
     163    if {$i != [llength $args]-1} {
     164        error "wrong # args: should be \"broadcast ?-protocol pattern? ?-avoid clients? message\""
     165    }
     166    set message [lindex $args end]
     167
     168    set nmesgs 0
     169    foreach cid [connections $pattern] {
     170        set addr [lindex [connectionName $cid] 0]  ;# x.x.x.x (sockN)
     171        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
     172puts "  inbound => [connectionName $cid]"
     173            if {[catch {puts $cid $message} result] == 0} {
     174                incr nmesgs
     175            } else {
     176                log error "ERROR: broadcast failed for $cid: $result"
     177                log error "  (message was $message)"
     178            }
     179        }
     180    }
     181    return $nmesgs
     182}
     183
     184# ----------------------------------------------------------------------
    82185#  USAGE: accept <cid> <addr> <port>
    83186#
     
    116219
    117220# ----------------------------------------------------------------------
     221#  USAGE: dropped <cid>
     222#
     223#  Invoked automatically whenever a client connection drops, to
     224#  log the event and remove all trace of the client.  Invokes any
     225#  command hook for this server to note the fact that the client
     226#  has dropped.
     227# ----------------------------------------------------------------------
     228itcl::body Server::dropped {cid} {
     229    # if there's a callback to handle the drop, execute it here
     230    if {[string length $ondisconnect] > 0} {
     231        uplevel #0 [list $ondisconnect [connectionName $cid]]
     232    }
     233
     234    # call the base class method to clean up after the client
     235    chain $cid
     236}
     237
     238# ----------------------------------------------------------------------
    118239#  USAGE: handlerType
    119240#
  • trunk/p2p/tclIndex

    r1257 r1273  
    1010set auto_index(::Handler::protocol) [list source [file join $dir handler.tcl]]
    1111set auto_index(::Handler::define) [list source [file join $dir handler.tcl]]
     12set auto_index(::Handler::connections) [list source [file join $dir handler.tcl]]
    1213set auto_index(::Handler::connectionName) [list source [file join $dir handler.tcl]]
    1314set auto_index(::Handler::connectionSpeaks) [list source [file join $dir handler.tcl]]
    1415set auto_index(::Handler::handle) [list source [file join $dir handler.tcl]]
    1516set auto_index(::Handler::finalize) [list source [file join $dir handler.tcl]]
     17set auto_index(::Handler::dropped) [list source [file join $dir handler.tcl]]
    1618set auto_index(::Handler::handlerType) [list source [file join $dir handler.tcl]]
     19set auto_index(::p2p::server) [list source [file join $dir server.tcl]]
    1720set auto_index(Server) [list source [file join $dir server.tcl]]
    1821set auto_index(::Server::port) [list source [file join $dir server.tcl]]
     22set auto_index(::Server::broadcast) [list source [file join $dir server.tcl]]
    1923set auto_index(::Server::accept) [list source [file join $dir server.tcl]]
    2024set auto_index(::Server::connectionSpeaks) [list source [file join $dir server.tcl]]
     25set auto_index(::Server::dropped) [list source [file join $dir server.tcl]]
    2126set auto_index(::Server::handlerType) [list source [file join $dir server.tcl]]
     27set auto_index(::p2p::client) [list source [file join $dir client.tcl]]
    2228set auto_index(Client) [list source [file join $dir client.tcl]]
    2329set auto_index(::Client::send) [list source [file join $dir client.tcl]]
     
    2632set auto_index(log) [list source [file join $dir log.tcl]]
    2733set auto_index(randomize) [list source [file join $dir random.tcl]]
     34set auto_index(random) [list source [file join $dir random.tcl]]
     35set auto_index(::p2p::options) [list source [file join $dir options.tcl]]
     36set auto_index(::p2p::protocol::register) [list source [file join $dir protocols.tcl]]
     37set auto_index(::p2p::protocol::init) [list source [file join $dir protocols.tcl]]
     38set auto_index(::p2p::protocol::define) [list source [file join $dir protocols.tcl]]
     39set auto_index(StateMachine) [list source [file join $dir statemachine.tcl]]
     40set auto_index(::StateMachine::state) [list source [file join $dir statemachine.tcl]]
     41set auto_index(::StateMachine::transition) [list source [file join $dir statemachine.tcl]]
     42set auto_index(::StateMachine::goto) [list source [file join $dir statemachine.tcl]]
     43set auto_index(::StateMachine::statedata) [list source [file join $dir statemachine.tcl]]
     44set auto_index(::p2p::wonks::init) [list source [file join $dir wonks.tcl]]
     45set auto_index(::p2p::wonks::current) [list source [file join $dir wonks.tcl]]
     46set auto_index(bgerror) [list source [file join $dir foreman.tcl]]
     47set auto_index(::Rappture::foreman::bids) [list source [file join $dir foreman.tcl]]
  • trunk/p2p/test.tcl

    r1257 r1273  
    2323set nodes(all) ""
    2424set nodeRadius 15
     25
     26option add *highlightBackground [. cget -background]
     27option add *client*background gray
     28option add *client*highlightBackground gray
     29option add *client*troughColor darkGray
    2530
    2631# ======================================================================
     
    162167#  Build the main interface
    163168# ======================================================================
     169frame .client -borderwidth 8 -relief flat
     170pack .client -side right -fill y
     171button .client.getbids -text "Get Bids:" -command test_bids
     172pack .client.getbids -side top -anchor w
     173frame .client.cntls
     174pack .client.cntls -side bottom -fill x
     175button .client.cntls.run -text "Spend" -command test_spend
     176pack .client.cntls.run -side left
     177entry .client.cntls.points -width 8
     178pack .client.cntls.points -side left
     179label .client.cntls.pointsl -text "points"
     180pack .client.cntls.pointsl -side left
     181
     182frame .client.bids
     183pack .client.bids -side bottom -expand yes -fill both
     184scrollbar .client.bids.ysbar -orient vertical -command {.client.bids.info yview}
     185pack .client.bids.ysbar -side right -fill y
     186listbox .client.bids.info -yscrollcommand {.client.bids.ysbar set}
     187pack .client.bids.info -side left -expand yes -fill both
     188
    164189frame .cntls
    165190pack .cntls -fill x
     
    181206}
    182207pack .cntls.layout -side left -padx 4 -pady 2
     208
     209entry .cntls.workers -width 5
     210pack .cntls.workers -side right -padx {0 4} -pady 2
     211.cntls.workers insert end "3"
     212label .cntls.workersl -text "Workers:"
     213pack .cntls.workersl -side right -pady 2
    183214
    184215frame .player
     
    238269
    239270    # launch a series of workers
    240     for {set i 0} {$i < 20} {incr i} {
     271    for {set i 0} {$i < [.cntls.workers get]} {incr i} {
    241272        lappend processes [exec tclsh worker.tcl &]
    242273        after [expr {int(rand()*5000)}]
     
    266297        set info [read $fid]
    267298        close $fid
     299
     300        if {[regexp -- {foreman<-} $info]} {
     301            # skip log file from foreman
     302            continue
     303        }
    268304
    269305        # get the address for this host
     
    322358        close $fid
    323359puts "\nscanning $fname"
     360
     361        if {[regexp -- {foreman<-} $info]} {
     362            # skip log file from foreman
     363            continue
     364        }
    324365
    325366        catch {unset started}
     
    358399                    unset started(connect$cid-addr)
    359400
    360                 } elseif {[regexp {server message from ([a-zA-Z0-9\.]+:[0-9]+) \(([a-z0-9]+)\): +(.+) => (.*)} $mesg match addr cid cmd result]} {
     401                } elseif {[regexp {(server|client) message from ([a-zA-Z0-9\.]+:[0-9]+) \(([a-z0-9]+)\): +(.+) => (.*)} $mesg match which addr cid cmd result]} {
    361402                    if {![string match identity* $cmd]} {
    362403                        append actions($tval) $mesg \n
     
    434475            set from $nodes($fname)
    435476            set addr $started($key)
     477
     478            if {![info exists nodes($addr-x)]} {
     479                unset started(connect$cid-time)
     480                unset started(connect$cid-addr)
     481                continue
     482            }
    436483            set x0 $nodes($from-x)
    437484            set y0 $nodes($from-y)
     
    558605    pack .diagram.$view -expand yes -fill both
    559606}
     607
     608proc test_bids {} {
     609    set info [Rappture::foreman::bids]
     610    .client.bids.info delete 0 end
     611    eval .client.bids.info insert end $info
     612}
  • trunk/p2p/worker.tcl

    r1257 r1273  
    2121
    2222# handle log file for this worker
     23log channel error on
     24log channel system on
    2325log channel debug on
    24 log channel system on
    25 
    26 # set up a server at the first open port above 9101
    27 set server [Server ::#auto 9101? -servername worker \
    28     -onprotocol worker_peers_protocol]
    29 
    30 # ======================================================================
    31 #  OPTIONS
    32 #  These options get the worker started, but new option settings
    33 #  some from the authority after this worker first connects.
    34 # ======================================================================
    35 set authority ""     ;# file handle for authority connection
     26
     27proc ::bgerror {err} { log error "ERROR: $err $::errorInfo" }
     28
     29
    3630set myaddress "?:?"  ;# address/port for this worker
    3731
    3832# set of connections for authority servers
    39 set options(authority_hosts) 127.0.0.1:9001
     33p2p::options register authority_hosts 127.0.0.1:9001
    4034
    4135# register with the central authority at this frequency
    42 set options(time_between_registers) 10000
     36p2p::options register time_between_authority_checks 60000
     37
     38# rebuild the peer-to-peer network at this frequency
     39p2p::options register time_between_network_rebuilds 600000
    4340
    4441# this worker should try to connect with this many other peers
    45 set options(max_peer_connections) 4
    46 
    47 # number of seconds between each check of system load
    48 set options(time_between_load_checks) 60000
    49 
    50 # ======================================================================
    51 #  PEERS
    52 # ======================================================================
    53 # eventually set to the list of all peers in the network
    54 set peers(all) ""
    55 
    56 # list of peers that we're testing connections with
    57 set peers(testing) ""
    58 
    59 # ======================================================================
    60 #  SYSTEM LOAD
    61 # ======================================================================
    62 # Check system load every so often.  When the load changes
    63 # significantly, execute a "perftest" run to compute the current
    64 # number of wonks available to this worker.
    65 
    66 set sysload(lastLoad) -1
    67 set sysload(wonks) -1
    68 set sysload(measured) -1
    69 
    70 after idle worker_load_check
    71 
    72 # ======================================================================
    73 #  PROTOCOL: hubzero:peer/1
    74 #  This protocol gets used when another worker connects to this one.
    75 # ======================================================================
    76 $server protocol hubzero:peer/1
    77 
    78 $server define hubzero:peer/1 exception {message} {
    79     log system "ERROR: $message"
    80 }
    81 
    82 # ----------------------------------------------------------------------
    83 #  DIRECTIVE: identity
    84 #  Used for debugging, so that each client can identify itself by
    85 #  name to this worker.
    86 # ----------------------------------------------------------------------
    87 $server define hubzero:peer/1 identity {name} {
    88     variable cid
    89     variable handler
    90     $handler connectionName $cid $name
    91     return ""
    92 }
    93 
    94 # ----------------------------------------------------------------------
    95 #  DIRECTIVE: ping
    96 # ----------------------------------------------------------------------
    97 $server define hubzero:peer/1 ping {} {
    98     return "pong"
    99 }
    100 
    101 # ======================================================================
    102 #  PROTOCOL: hubzero:worker/1
    103 #  The authority_connect procedure connects this worker to an authority
    104 #  and begins the authority/worker protocol.  The directives below
    105 #  handle the incoming (authority->worker) traffic.
    106 # ======================================================================
    107 proc authority_connect {} {
    108     global authority options
    109     log system "connecting to authorities..."
    110 
    111     authority_disconnect
    112 
    113     # scan through list of authorities and try to connect
    114     foreach addr [randomize $options(authority_hosts)] {
    115         if {[catch {Client ::#auto $addr} result] == 0} {
    116             set authority $result
    117             break
    118         }
    119     }
    120 
    121     if {"" == $authority} {
    122         error "can't connect to any known authorities"
    123     }
    124 
    125     # add handlers to process the incoming commands...
    126     $authority protocol hubzero:worker/1
    127     $authority define hubzero:worker/1 exception {args} {
    128         log system "error from authority: $args"
    129     }
    130 
    131     # ------------------------------------------------------------------
    132     #  DIRECTIVE: options <key1> <value1> <key2> <value2> ...
     42p2p::options register max_peer_connections 4
     43
     44# workers propagate messages until time-to-live reaches 0
     45p2p::options register peer_time_to_live 4
     46
     47# ======================================================================
     48#  PROTOCOL: hubzero:worker<-authority/1
     49#
     50#  The worker initiates communication with the authority, and the
     51#  authority responds by sending these messages.
     52# ======================================================================
     53p2p::protocol::register hubzero:worker<-authority/1 {
     54
     55    # ------------------------------------------------------------------
     56    #  INCOMING: options <key1> <value1> <key2> <value2> ...
    13357    #  These option settings coming from the authority override the
    134     #  option settings built into the client at the top of this
    135     #  script.  The authority probably has a more up-to-date list
    136     #  of other authorities and better policies for living within
    137     #  the network.
    138     # ------------------------------------------------------------------
    139     $authority define hubzero:worker/1 options {args} {
    140         global options server myaddress
     58    #  option settings built into the client.  The authority probably
     59    #  has a more up-to-date list of other authorities and better
     60    #  policies for living within the network.
     61    # ------------------------------------------------------------------
     62    define options {args} {
     63        global server myaddress
    14164
    14265        array set options $args
     66        foreach key [array names options] {
     67            catch {p2p::options set $key $options($key)}
     68        }
    14369
    14470        if {[info exists options(ip)]} {
     
    15076
    15177    # ------------------------------------------------------------------
    152     #  DIRECTIVE: peers <listOfAddresses>
     78    #  INCOMING: peers <listOfAddresses>
    15379    #  This message comes in after this worker has sent the "peers"
    15480    #  message to request the current list of peers.  The
     
    15682    #  worker should contact to enter the p2p network.
    15783    # ------------------------------------------------------------------
    158     $authority define hubzero:worker/1 peers {plist} {
     84    define peers {plist} {
    15985        global peers
    16086        set peers(all) $plist
    161         after idle worker_peers_update
     87        after idle {peer-network goto measure}
    16288
    16389        # now that we've gotten the peers, we're done with the authority
    164         authority_disconnect
    165     }
    166 
    167     # ------------------------------------------------------------------
    168     #  DIRECTIVE: identity
     90        authority-connection goto idle
     91        return ""
     92    }
     93
     94    # ------------------------------------------------------------------
     95    #  INCOMING: identity
    16996    #  Used for debugging, so that each client can identify itself by
    17097    #  name to this worker.
    17198    # ------------------------------------------------------------------
    172     $authority define hubzero:worker/1 identity {name} {
     99    define identity {name} {
    173100        variable cid
    174101        variable handler
     
    176103        return ""
    177104    }
    178 
    179     $authority send "protocol hubzero:worker/1"
    180     return $authority
    181 }
    182 
    183 proc authority_disconnect {} {
    184     global authority
    185     if {"" != $authority} {
    186         catch {itcl::delete object $authority}
    187         set authority ""
    188     }
    189 }
    190 
    191 # ======================================================================
    192 #  USEFUL ROUTINES
    193 # ======================================================================
    194 # ----------------------------------------------------------------------
    195 #  COMMAND:  worker_register
    196 #
    197 #  Invoked when this worker first starts up and periodically thereafter
    198 #  to register the worker with the central authority, and to request
    199 #  a list of peers that this peer can talk to.
    200 # ----------------------------------------------------------------------
    201 proc worker_register {} {
    202     global options server
     105}
     106
     107# ======================================================================
     108#  PROTOCOL: hubzero:worker<-foreman/1
     109#
     110#  The foreman initiates communication with a worker, and sends the
     111#  various messages supported below.
     112# ======================================================================
     113p2p::protocol::register hubzero:worker<-foreman/1 {
     114
     115    # ------------------------------------------------------------------
     116    #  INCOMING: solicit
     117    #  Foremen send this message to solicit bids for a simulation job.
     118    # ------------------------------------------------------------------
     119    define solicit {args} {
     120log debug "SOLICIT from foreman: $args"
     121        variable cid
     122        log debug "solicitation request from foreman: $args"
     123        eval Solicitation ::#auto $args -connection $cid
     124        return ""
     125    }
     126}
     127
     128# ======================================================================
     129#  PROTOCOL: hubzero:workers<-workerc/1
     130#
     131#  Workers initiate connections with other workers as peers.  The
     132#  following messages are sent by worker clients to the worker server.
     133# ======================================================================
     134p2p::protocol::register hubzero:workers<-workerc/1 {
     135    # ------------------------------------------------------------------
     136    #  INCOMING: identity
     137    #  Used for debugging, so that each client can identify itself by
     138    #  name to this worker.
     139    # ------------------------------------------------------------------
     140    define identity {name} {
     141        variable cid
     142        variable handler
     143        $handler connectionName $cid $name
     144        return ""
     145    }
     146
     147    # ------------------------------------------------------------------
     148    #  INCOMING: ping
     149    #  If another worker sends "ping", they are trying to measure the
     150    #  speed of the connection.  Send back a "pong" message.  If this
     151    #  worker is close by, the other worker will stay connected to it.
     152    # ------------------------------------------------------------------
     153    define ping {} {
     154        return "pong"
     155    }
     156
     157    # ------------------------------------------------------------------
     158    #  INCOMING: solicit -job info -path hosts -token xxx
     159    #  Workers send this message on to their peers to solicit bids
     160    #  for a simulation job.
     161    # ------------------------------------------------------------------
     162    define solicit {args} {
     163log debug "SOLICIT from peer: $args"
     164        variable cid
     165        log debug "solicitation request from peer: $args"
     166        eval Solicitation ::#auto $args -connection $cid
     167        return ""
     168    }
     169
     170    # ------------------------------------------------------------------
     171    #  INCOMING: proffer <token> <details>
     172    #  Workers send this message back after a "solicit" request with
     173    #  details about what they can offer in terms of CPU power.  When
     174    #  a worker has received all replies from its peers, it sends back
     175    #  its own proffer message to the client that started the
     176    #  solicitation.
     177    # ------------------------------------------------------------------
     178    define proffer {token details} {
     179log debug "PROFFER from peer: $token $details"
     180        Solicitation::proffer $token $details
     181        return ""
     182    }
     183}
     184
     185# ======================================================================
     186#  PROTOCOL: hubzero:workerc<-workers/1
     187#
     188#  The following messages are received by a worker client in response
     189#  to the requests that they send to a worker server.
     190# ======================================================================
     191p2p::protocol::register hubzero:workerc<-workers/1 {
     192    # ------------------------------------------------------------------
     193    #  INCOMING: identity
     194    #  Used for debugging, so that each client can identify itself by
     195    #  name to this worker.
     196    # ------------------------------------------------------------------
     197    define identity {name} {
     198        variable cid
     199        variable handler
     200        $handler connectionName $cid $name
     201        return ""
     202    }
     203
     204    # ------------------------------------------------------------------
     205    #  INCOMING: pong
     206    #  When forming the peer-to-peer network, workers send ping/pong
     207    #  messages to one another to measure the latency.
     208    # ------------------------------------------------------------------
     209    define pong {} {
     210        global peers
     211        variable handler
     212        set now [clock clicks -milliseconds]
     213        set delay [expr {$now - $peers(ping-$handler-start)}]
     214        set peers(ping-$handler-latency) $delay
     215        if {[incr peers(responses)] >= [llength $peers(testing)]} {
     216            after cancel {peer-network goto finalize}
     217            after idle {peer-network goto finalize}
     218        }
     219        return ""
     220    }
     221
     222    # ------------------------------------------------------------------
     223    #  INCOMING: solicit -job info -path hosts -token xxx
     224    #  Workers send this message on to their peers to solicit bids
     225    #  for a simulation job.
     226    # ------------------------------------------------------------------
     227    define solicit {args} {
     228log debug "SOLICIT from peer: $args"
     229        variable cid
     230        log debug "solicitation request from peer: $args"
     231        eval Solicitation ::#auto $args -connection $cid
     232        return ""
     233    }
     234
     235    # ------------------------------------------------------------------
     236    #  INCOMING: proffer <token> <details>
     237    #  Workers send this message back after a "solicit" request with
     238    #  details about what they can offer in terms of CPU power.  When
     239    #  a worker has received all replies from its peers, it sends back
     240    #  its own proffer message to the client that started the
     241    #  solicitation.
     242    # ------------------------------------------------------------------
     243    define proffer {token details} {
     244log debug "PROFFER from peer: $token $details"
     245        Solicitation::proffer $token $details
     246        return ""
     247    }
     248}
     249
     250# ----------------------------------------------------------------------
     251#  COMMAND:  broadcast_to_peers <message> ?<avoidList>?
     252#
     253#  Used to broadcast a message out to all peers.  The <message> must
     254#  be one of the commands defined in the worker protocol.  If there
     255#  are no peer connections yet, this command does nothing.  If any
     256#  peer appears on the <avoidList>, it is skipped.  Returns the
     257#  number of messages sent out, so this peer knows how many replies
     258#  to wait for.
     259# ----------------------------------------------------------------------
     260proc broadcast_to_peers {message {avoidList ""}} {
     261    global server peers
     262
     263    #
     264    # Build a list of all peer addresses, so we know who we're
     265    # going to send this message out to.
     266    #
     267    set recipients ""
     268    foreach key [array names peers current-*] {
     269        set addr [$peers($key) address]
     270        if {[lsearch $avoidList $addr] < 0} {
     271            lappend recipients $addr
     272        }
     273    }
     274    foreach cid [$server connections hubzero:workers<-workerc/1] {
     275        set addr [lindex [$server connectionName $cid] 0]  ;# x.x.x.x (sockN)
     276        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
     277            lappend recipients $addr
     278        }
     279    }
     280
     281    #
     282    # If the message has any @RECIPIENTS fields, replace them
     283    # with the list of recipients
     284    #
     285    regsub -all @RECIPIENTS $message $recipients message
     286
     287    #
     288    # Send the message out to all peers.  Keep a count and double-check
     289    # it against the list of recipients generated above.
     290    #
     291    set nmesgs 0
     292
     293    # send off to other workers that this one has connected to
     294    foreach key [array names peers current-*] {
     295        set addr [$peers($key) address]
     296        if {[lsearch $avoidList $addr] < 0} {
     297            if {[catch {$peers($key) send $message} err] == 0} {
     298                incr nmesgs
     299            } else {
     300                log error "ERROR: broadcast failed to [$peers($key) address]: $result\n  (message was \"$message\")"
     301            }
     302        }
     303    }
     304
     305    # send off to other workers that connected to this one
     306    foreach cid [$server connections hubzero:workers<-workerc/1] {
     307        set addr [lindex [$server connectionName $cid] 0]  ;# x.x.x.x (sockN)
     308        if {[llength $avoidList] == 0 || [lsearch $avoidList $addr] < 0} {
     309            if {[catch {puts $cid $message} result] == 0} {
     310                incr nmesgs
     311            } else {
     312                log error "ERROR: broadcast failed for $cid: $result"
     313                log error "  (message was $message)"
     314            }
     315        }
     316    }
     317
     318    # did we send the right number of messages?
     319    if {[llength $recipients] != $nmesgs} {
     320        log error "ERROR: sent only $nmesgs messages to peers {$recipients}"
     321    }
     322
     323    return $nmesgs
     324}
     325
     326# ----------------------------------------------------------------------
     327#  COMMAND:  worker_got_protocol
     328#
     329#  Invoked whenever a peer sends their protocol message to this
     330#  worker.  Sends the same protocol name back, so the other worker
     331#  understands what protocol we're speaking.
     332# ----------------------------------------------------------------------
     333proc worker_got_protocol {cid protocol} {
     334    switch -glob -- $protocol {
     335        *<-worker* {
     336            puts $cid [list protocol hubzero:workerc<-workers/1]
     337        }
     338        *<-foreman* {
     339            puts $cid [list protocol hubzero:foreman<-worker/1]
     340        }
     341        DEF* {
     342            # do nothing
     343        }
     344        default {
     345            error "don't recognize protocol \"$protocol\""
     346        }
     347    }
     348}
     349
     350# ----------------------------------------------------------------------
     351#  COMMAND:  worker_got_dropped <address>
     352#
     353#  Invoked whenever an inbound connection to this worker drops.
     354#  At some point we should try updating our connections to other
     355#  peers, to replace this missing connection.
     356# ----------------------------------------------------------------------
     357proc worker_got_dropped {addr} {
     358    global peers
     359    if {[info exists peers(current-$addr)]} {
     360        unset peers(current-$addr)
     361        after cancel {peer-network goto measure}
     362        after 5000 {peer-network goto measure}
     363log debug "peer dropped: $addr"
     364    }
     365}
     366
     367# ======================================================================
     368#  Connect to one of the authorities to get a list of peers.  Then,
     369#  sit in the event loop and process events.
     370# ======================================================================
     371p2p::wonks::init
     372
     373set server [p2p::server -port 9101? \
     374        -protocols {hubzero:workers<-workerc hubzero:worker<-foreman} \
     375        -servername worker \
     376        -onprotocol worker_got_protocol \
     377        -ondisconnect worker_got_dropped]
     378
     379# ----------------------------------------------------------------------
     380#  AUTHORITY CONNECTION
     381#
     382#  This is the state machine representing the connection to the
     383#  authority server.  We start in the "idle" state.  Whenever we
     384#  try to move to "connected", we'll open a connection to an authority
     385#  and request updated information on workers.  If that connection
     386#  fails, we'll get kicked back to "idle" and we'll try again later.
     387# ----------------------------------------------------------------------
     388StateMachine authority-connection
     389authority-connection statedata cnx ""
     390
     391# sit here whenever we don't have a connection
     392# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     393authority-connection state idle -onenter {
     394    # have an open connection? then close it
     395    if {"" != [statedata cnx]} {
     396        catch {itcl::delete object [statedata cnx]}
     397        statedata cnx ""
     398    }
     399    # try to connect again later
     400    set delay [p2p::options get time_between_authority_checks]
     401    after $delay {authority-connection goto connected}
     402} -onleave {
     403    after cancel {authority-connection goto connected}
     404}
     405
     406# sit here after we're connected and waiting for a response
     407# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     408authority-connection state connected
     409
     410# when moving to the connected state, make a connection
     411# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     412authority-connection transition idle->connected -onchange {
     413    global server
    203414
    204415    # connect to the authority and request a list of peers
    205     if {[catch {
    206         set client [authority_connect]
    207         $client send "listening [$server port]"
    208         $client send "peers"
    209     } result]} {
    210         log system "ERROR: $result"
    211     }
    212 
    213     # register again at regular intervals in case the authority
    214     # gets restarted in between.
    215     after $options(time_between_registers) worker_register
    216 }
    217 
    218 # ----------------------------------------------------------------------
    219 #  COMMAND:  worker_load_check
    220 #
    221 #  Invoked when this worker first starts up and periodically thereafter
    222 #  to compute the system load available to the worker.  If the system
    223 #  load has changed significantly, then the "perftest" program is
    224 #  executed to get a measure of performance available.  This program
    225 #  returns a number of "wonks" which we report to peers as our available
    226 #  performance.
    227 # ----------------------------------------------------------------------
    228 proc worker_load_check {} {
    229     global options sysload dir
    230 
    231     # see if the load has changed significantly
    232     set changed 0
    233     if {$sysload(lastLoad) < 0} {
    234         set changed 1
     416    statedata cnx ""
     417    foreach addr [randomize [p2p::options get authority_hosts]] {
     418        if {[catch {p2p::client -address $addr \
     419            -sendprotocol hubzero:authority<-worker/1 \
     420            -receiveprotocol hubzero:worker<-authority/1} result] == 0} {
     421            statedata cnx $result
     422            break
     423        }
     424    }
     425
     426    if {"" != [statedata cnx]} {
     427        [statedata cnx] send "listening [$server port]"
     428        [statedata cnx] send "peers"
    235429    } else {
    236         set load [Rappture::sysinfo load5]
    237         if {$load < 0.9*$sysload(lastLoad) || $load > 1.1*$sysload(lastLoad)} {
    238             set changed 1
    239         }
    240     }
    241 
    242     if {$changed} {
    243         set sysload(lastLoad) [Rappture::sysinfo load5]
    244 puts "LOAD CHANGED: $sysload(lastLoad) [Rappture::sysinfo freeram freeswap]"
    245         set sysload(measured) [clock seconds]
    246 
    247         # Run the program, but don't use exec, since it blocks.
    248         # Instead, run it in the background and harvest its output later.
    249         set sysload(test) [open "| [file join $dir perftest]" r]
    250         fileevent $sysload(test) readable worker_load_results
    251     }
    252 
    253     # monitor the system load at regular intervals
    254     after $options(time_between_load_checks) worker_load_check
    255 }
    256 
    257 # ----------------------------------------------------------------------
    258 #  COMMAND:  worker_load_results
    259 #
    260 #  Invoked automatically when the "perftest" run finishes.  Reads the
    261 #  number of wonks reported on standard output and reports them to
    262 #  other peers.
    263 # ----------------------------------------------------------------------
    264 proc worker_load_results {} {
    265     global sysload
    266 
    267     if {[catch {read $sysload(test)} msg] == 0} {
    268         if {[regexp {[0-9]+} $msg match]} {
    269 puts "WONKS: $match"
    270             set sysload(wonks) $match
    271         } else {
    272             log system "ERROR: performance test failed: $msg"
    273         }
    274     }
    275     catch {close $sysload(test)}
    276     set sysload(test) ""
    277 }
    278 
    279 # ----------------------------------------------------------------------
    280 #  COMMAND:  worker_peers_update
    281 #
    282 #  Invoked when this worker has received a list of peers, and from
    283 #  time to time thereafter, to establish connections with other peers
    284 #  in the network.  This worker picks a number of peers randomly from
    285 #  the list of all available peers, and then pings each of them.
    286 #  Timing results from the pings are stored away, and when all pings
    287 #  have completed, this worker picks the best few connections and
    288 #  keeps those.
    289 # ----------------------------------------------------------------------
    290 proc worker_peers_update {} {
    291     global options peers myaddress
    292     worker_peers_cleanup
     430        error "can't connect to any authority\nAUTHORITY LIST: [p2p::options get authority_hosts]"
     431    }
     432}
     433
     434# ----------------------------------------------------------------------
     435#  PEER-TO-PEER CONNECTIONS
     436#
     437#  This is the state machine representing the network of worker peers.
     438#  We start in the "idle" state.  From time to time we move to the
     439#  "measure" state and attempt to establish connections with a set
     440#  peers.  We then wait for ping/pong responses and move to "finalize".
     441#  When we have all responses, we move to "finalize" and finalize all
     442#  connections, and then move back to "idle".
     443# ----------------------------------------------------------------------
     444StateMachine peer-network
     445peer-network statedata cnx ""
     446
     447# sit here when the peer network is okay
     448# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     449peer-network state idle -onenter {
     450    # try to connect again later
     451    set delay [p2p::options get time_between_network_rebuilds]
     452    after $delay {peer-network goto measure}
     453} -onleave {
     454    after cancel {peer-network goto measure}
     455}
     456
     457# sit here when we need to rebuild the peer-to-peer network
     458# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     459peer-network state measure
     460
     461# when moving to the start state, make connections to a bunch of peers
     462# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     463peer-network transition idle->measure -onchange {
     464    global server myaddress peers
     465
     466    #
     467    # Get a list of workers already connected to this one
     468    # with an inbound connection to the server.
     469    #
     470    foreach cid [$server connections hubzero:workers<-workerc/1] {
     471        set addr [lindex [$server connectionName $cid] 0]
     472        set inbound($addr) $cid
     473    }
    293474
    294475    #
    295476    # Pick a random group of peers and try to connect to them.
    296     # Start with the existing peers, and then add a random
    297     # bunch of others.
    298     #
     477    # Start with the existing peers to see if their connection
     478    # is still favorable, and then add a random bunch of others.
     479    #
     480    set peers(testing) ""
     481    set peers(responses) 0
    299482    foreach key [array names peers current-*] {
    300483        set peer $peers($key)
     
    307490    # final number we want to talk to.
    308491    #
    309     set ntest [expr {10 * $options(max_peer_connections)}]
    310     foreach addr [randomize $peers(all) $ntest] {
    311         if {$addr == $myaddress} {
     492    set maxpeers [p2p::options get max_peer_connections]
     493    foreach addr [randomize $peers(all) [expr {10*$maxpeers}]] {
     494        #
     495        # Avoid connecting to ourself, or to peers that we're
     496        # already connected to (either as inbound connections
     497        # to the server or as outbound connections to others).
     498        #
     499        if {$addr == $myaddress
     500              || [info exists peers(current-$addr)]
     501              || [info exists inbound($addr)]} {
    312502            continue
    313503        }
    314         if {![info exists peers(current-$addr)]
    315               && [catch {Client ::#auto $addr} peer] == 0} {
    316             # open a new connection to this address
    317             lappend peers(testing) $peer
    318             $peer protocol hubzero:peer/1
    319 
    320             $peer define hubzero:peer/1 exception {message} {
    321                 log system "ERROR: $message"
    322             }
    323 
    324             $peer define hubzero:peer/1 identity {name} {
    325                 variable cid
    326                 variable handler
    327                 $handler connectionName $cid $name
    328                 return ""
    329             }
    330 
    331             # when we get a "pong" back, store the latency
    332             $peer define hubzero:peer/1 pong {} {
    333                 global peers
    334                 variable handler
    335 puts "  pong from $handler"
    336                 set now [clock clicks -milliseconds]
    337                 set delay [expr {$now - $peers(ping-$handler-start)}]
    338                 set peers(ping-$handler-latency) $delay
    339                 incr peers(responses)
    340                 worker_peers_finalize
    341                 return ""
    342             }
    343 
    344             # start the ping/pong session with the peer
    345             $peer send "protocol hubzero:peer/1"
    346 
    347             # send tell this peer our name (for debugging)
    348             if {[log channel debug]} {
    349                 $peer send "identity $myaddress"
    350             }
     504
     505        if {[catch {p2p::client -address $addr \
     506            -sendprotocol hubzero:workers<-workerc/1 \
     507            -receiveprotocol hubzero:workerc<-workers/1} cnx]} {
     508            continue
     509        }
     510        $cnx send "identity $myaddress"
     511        lappend peers(testing) $cnx
     512
     513        # have enough connections to test?
     514        if {[llength $peers(testing)] >= 2*$maxpeers} {
     515            break
    351516        }
    352517    }
     
    355520    # Now, loop through all peers and send a "ping" message.
    356521    #
    357     foreach peer $peers(testing) {
    358 puts "pinging $peer = [$peer address]..."
     522    foreach cnx $peers(testing) {
    359523        # mark the time and send a ping to this peer
    360         set peers(ping-$peer-start) [clock clicks -milliseconds]
    361         $peer send "ping"
     524        set peers(ping-$cnx-start) [clock clicks -milliseconds]
     525        $cnx send "ping"
    362526    }
    363527
    364528    # if this test takes too long, just give up
    365     after 10000 worker_peers_finalize -force
    366 }
    367 
    368 # ----------------------------------------------------------------------
    369 #  COMMAND:  worker_peers_finalize ?-force?
    370 #
    371 #  Called after worker_peers_update has finished its business to
    372 #  clean up all of the connections that were open for testing.
    373 # ----------------------------------------------------------------------
    374 proc worker_peers_finalize {{option -check}} {
    375     global peers options
    376     if {$option == "-force" || $peers(responses) == [llength $peers(testing)]} {
    377         # build a list:  {peer latency} {peer latency} ...
    378         set plist ""
    379         foreach obj $peers(testing) {
    380             if {[info exists peers(ping-$obj-latency)]} {
    381                 lappend plist [list $obj $peers(ping-$obj-latency)]
    382             }
    383         }
    384 puts "-------------------\nFINALIZING $::myaddress"
    385 puts "PINGS: $plist"
    386 
    387         # sort the list and extract the top peers
    388         set plist [lsort -increasing -index 1 $plist]
    389         set plist [lrange $plist 0 [expr {$options(max_peer_connections)-1}]]
    390 puts "TOP: $plist"
    391 
    392         set current [array names peers current-*]
    393 puts "  current: $current"
    394 
    395         # transfer the top peers to the "current" list
    396         set all ""
    397         foreach rec $plist {
    398             set peer [lindex $rec 0]
    399             set addr [$peer address]
    400             set peers(current-$addr) $peer
    401             lappend all $addr
    402 
    403             # if it is already on the "current" list, then keep it
    404             set i [lsearch $current current-$addr]
    405             if {$i >= 0} {
    406                 set current [lreplace $current $i $i]
    407             }
    408             set i [lsearch $peers(testing) $peer]
    409             if {$i >= 0} {
    410                 set peers(testing) [lreplace $peers(testing) $i $i]
    411             }
    412         }
    413         log system "connected to peers: $all"
    414 puts "  new current: [array names peers current-*]"
    415 puts "  final: $all"
    416 puts "  leftover: $current $peers(testing)"
    417 
    418         # get rid of old peers that we no longer want to talk to
    419         foreach leftover $current {
    420             itcl::delete object $peers($leftover)
    421             unset peers($leftover)
    422 puts "  cleaned up $leftover (was on current list)"
    423         }
    424 
    425         # clean up after this test
    426         worker_peers_cleanup
    427     }
    428 }
    429 
    430 # ----------------------------------------------------------------------
    431 #  COMMAND:  worker_peers_cleanup
    432 #
    433 #  Called after worker_peers_update has finished its business to
    434 #  clean up all of the connections that were open for testing.
    435 # ----------------------------------------------------------------------
    436 proc worker_peers_cleanup {} {
     529    after 10000 {peer-network goto finalize}
     530}
     531
     532# sit here when we need to rebuild the peer-to-peer network
     533# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     534peer-network state finalize -onenter {
     535    after cancel {peer-network goto finalize}
     536
     537    # everything is finalized now, so go to idle
     538    after idle {peer-network goto idle}
     539}
     540
     541# when moving to the finalize state, decide on the network of peers
     542# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
     543peer-network transition measure->finalize -onchange {
    437544    global peers
     545
     546    set maxpeers [p2p::options get max_peer_connections]
     547
     548    # build a list:  {peer latency} {peer latency} ...
     549    set plist ""
     550    foreach obj $peers(testing) {
     551        if {[info exists peers(ping-$obj-latency)]} {
     552            lappend plist [list $obj $peers(ping-$obj-latency)]
     553        }
     554    }
     555
     556    # sort the list and extract the top peers
     557    set plist [lsort -increasing -index 1 $plist]
     558    set plist [lrange $plist 0 [expr {$maxpeers-1}]]
     559
     560    set currentpeers [array names peers current-*]
     561
     562    # transfer the top peers to the "current" list
     563    set all ""
     564    foreach rec $plist {
     565        set peer [lindex $rec 0]
     566        set addr [$peer address]
     567        set peers(current-$addr) $peer
     568        lappend all $addr
     569
     570        # if it is already on the "current" list, then keep it
     571        set i [lsearch $currentpeers current-$addr]
     572        if {$i >= 0} {
     573            set currentpeers [lreplace $currentpeers $i $i]
     574        }
     575        set i [lsearch $peers(testing) $peer]
     576        if {$i >= 0} {
     577            set peers(testing) [lreplace $peers(testing) $i $i]
     578        }
     579    }
     580    log system "connected to peers: $all"
     581
     582    # get rid of old peers that we no longer want to talk to
     583    foreach leftover $currentpeers {
     584        itcl::delete object $peers($leftover)
     585        unset peers($leftover)
     586    }
     587
     588    # clean up after this test
    438589    foreach obj $peers(testing) {
    439590        catch {itcl::delete object $obj}
    440 puts "  cleaned up $obj (was on testing list)"
    441591    }
    442592    set peers(testing) ""
     
    446596    }
    447597    set peers(responses) 0
    448 
    449     after cancel worker_peers_finalize -force
    450 }
    451 
    452 # ----------------------------------------------------------------------
    453 #  COMMAND:  worker_peers_protocol
    454 #
    455 #  Invoked whenever a peer sends their protocol message to this
    456 #  worker.  Sends the same protocol name back, so the other worker
    457 #  understands what protocol we're speaking.
    458 # ----------------------------------------------------------------------
    459 proc worker_peers_protocol {cid protocol} {
    460     if {"DEFAULT" != $protocol} {
    461         puts $cid [list protocol $protocol]
    462     }
    463 }
    464 
    465 # ======================================================================
    466 #  Connect to one of the authorities to get a list of peers.  Then,
    467 #  sit in the event loop and process events.
    468 # ======================================================================
    469 after idle worker_register
     598}
     599
     600# ----------------------------------------------------------------------
     601#  JOB SOLICITATION
     602#
     603#  Each worker can receive a "solicit" request, asking for information
     604#  about performance and price of peers available to work.  Each worker
     605#  sends the request on to its peers, then gathers the information
     606#  and sends it back to the client or to the peer requesting the
     607#  information.  There can be multiple requests going on at once,
     608#  and each may have different job types and return different info,
     609#  so the class below helps to watch over each request until it has
     610#  completed.
     611# ----------------------------------------------------------------------
     612itcl::class Solicitation {
     613    public variable connection ""
     614    public variable job ""
     615    public variable path ""
     616    public variable avoid ""
     617    public variable token ""
     618
     619    private variable _serial ""
     620    private variable _response ""
     621    private variable _waitfor 0
     622    private variable _timeout ""
     623
     624    constructor {args} {
     625        eval configure $args
     626
     627        global myaddress
     628        lappend path $myaddress
     629        lappend avoid $myaddress
     630        set _serial [incr counter]
     631        set all($_serial) $this
     632
     633        set delay "idle"  ;# finalize after waiting for responses
     634        set ttl [p2p::options get peer_time_to_live]
     635        if {[llength $path] < $ttl} {
     636            set mesg [list solicit -job $job -path $path -avoid "$avoid @RECIPIENTS" -token $_serial]
     637            set _waitfor [broadcast_to_peers $mesg $avoid]
     638log debug "WAIT FOR: $_waitfor"
     639
     640            if {$_waitfor > 0} {
     641                # add a delay proportional to ttl + time for wonks measurement
     642                set delay [expr {($ttl-[llength $path]-1)*1000 + 3000}]
     643            }
     644        }
     645log debug "TIMEOUT: $delay"
     646        set _timeout [after $delay [itcl::code $this finalize]]
     647    }
     648    destructor {
     649        after cancel $_timeout
     650        catch {unset all($_serial)}
     651    }
     652
     653    # this adds the info from each proffer to this solicitation
     654    method response {details} {
     655        set addr [lindex $details 0]
     656        append _response $details "\n"
     657        if {[incr _waitfor -1] <= 0} {
     658log debug "ALL RESPONSES"
     659            finalize
     660        }
     661    }
     662
     663    # called to finalize when all peers have responded back
     664    method finalize {} {
     665        global myaddress
     666
     667        # filter out duplicate info from clients
     668        set block ""
     669        foreach line [split $_response \n] {
     670            set addr [lindex $line 0]
     671            if {"" != $addr && ![info exists response($addr)]} {
     672                append block $line "\n"
     673                set response($addr) 1
     674            }
     675        }
     676        # add details about this client to the message
     677        append block "$myaddress -job $job -cost 1 -wonks [p2p::wonks::current]"
     678log debug "FINALIZE {$block}"
     679
     680        # send the composite results back to the caller
     681        set cmd [list proffer $token $block]
     682        if {[catch {puts $connection $cmd} err]} {
     683            log error "ERROR while sending back proffer: $err"
     684        }
     685        itcl::delete object $this
     686    }
     687
     688    proc proffer {serial details} {
     689        if {[info exists all($serial)]} {
     690            $all($serial) response $details
     691        }
     692    }
     693
     694    common counter 0 ;# generates serial nums for objects
     695    common all       ;# maps serial num => solicitation object
     696}
     697
     698# ----------------------------------------------------------------------
    470699log system "starting..."
     700
     701after idle {
     702    authority-connection goto connected
     703}
     704
    471705vwait main-loop
Note: See TracChangeset for help on using the changeset viewer.