Changeset 715


Ignore:
Timestamp:
May 8, 2007, 1:55:00 PM (17 years ago)
Author:
nkissebe
Message:

multi-service and detachment support added to nanoscale

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/gui/vizservers/nanoscale/server.c

    r409 r715  
    1212#include <netinet/in.h>
    1313#include <getopt.h>
     14#include <errno.h>
    1415
    1516// The initial request load for a new renderer.
     
    2526// justify redirection.
    2627#define LOAD_REDIRECT_FACTOR 0.8
     28
     29// Maxium number of services we support
     30#define MAX_SERVICES 100
    2731
    2832float load = 0;             // The present load average for this system.
     
    3337fd_set saved_rfds;          // Descriptors we're reading from.
    3438fd_set pipe_rfds;           // Descriptors that are pipes to children.
    35 
     39fd_set service_rfds[MAX_SERVICES];
    3640
    3741struct host_info {
     
    4246
    4347struct child_info {
    44   pid_t pid;
    4548  int   memory;
    4649  int   pipefd;
     
    113116}
    114117
    115 void check_children(void)
    116 {
    117   int status;
    118 
    119   // Collect any children that have exited.
    120   do {
    121     int return_status;
    122     status = waitpid(0, &return_status, WNOHANG);
    123     if (status > 0) {
    124       children--;
    125     }
     118close_child(int pipe_fd)
     119{
    126120    int i;
    127121    for(i=0; i<sizeof(child_array)/sizeof(child_array[0]); i++) {
    128       if (child_array[i].pid == status) {
     122      if (child_array[i].pipefd == pipe_fd) {
     123        children--;
    129124        memory_in_use -= child_array[i].memory;
    130         child_array[i].pid = 0;
    131125        child_array[i].memory = 0;
    132         status = close(child_array[i].pipefd);
    133126        FD_CLR(child_array[i].pipefd, &saved_rfds);
    134127        FD_CLR(child_array[i].pipefd, &pipe_rfds);
     128        close(child_array[i].pipefd);
     129        child_array[i].pipefd = 0;
    135130        break;
    136       }
    137     }
    138   } while(status > 0);
    139   printf("processes=%d, memory=%d, load=%f\n",
     131          }
     132        }
     133 
     134    printf("processes=%d, memory=%d, load=%f\n",
    140135         children, memory_in_use, load);
    141136
    142   broadcast_load();
     137    broadcast_load();
    143138}
    144139
     
    150145      child_array[c].requests += value;
    151146#ifdef DEBUGGING
    152       printf("Updating requests for pid %d to %f\n",
    153              child_array[c].pid,
     147      printf("Updating requests from pipefd %d to %f\n",
     148             child_array[c].pipefd,
    154149             child_array[c].requests);
    155150#endif
     
    167162  int c;
    168163  for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
    169     if (child_array[c].pid != 0) {
     164    if (child_array[c].pipefd != 0) {
    170165      newload += child_array[c].requests * child_array[c].memory;
    171166      child_array[c].requests = 0;
     
    180175
    181176volatile int sigalarm_set;
    182 volatile int sigchild_set;
    183177void sigalarm_handler(int signum)
    184178{
    185179  sigalarm_set = 1;
    186 }
    187 void sigchild_handler(int signum)
    188 {
    189   sigchild_set = 1;
    190180}
    191181
     
    198188}
    199189
     190int
     191clear_service_fd(int fd)
     192{
     193    int n;
     194
     195        for(n = 0; n < MAX_SERVICES; n++)
     196        {
     197            if (FD_ISSET(fd, &service_rfds[n]))
     198                    FD_CLR(fd, &service_rfds[n]);
     199        }
     200}
     201
    200202int main(int argc, char *argv[])
    201203{
    202   char server_command[1000];
    203   int command_argc;
    204   char **command_argv;
     204  char server_command[MAX_SERVICES][1000];
     205  int nservices = 0;
     206  int command_argc[MAX_SERVICES];
     207  char **command_argv[MAX_SERVICES];
    205208  int val;
    206   int listen_fd = -1;
     209  int listen_fd[MAX_SERVICES];
    207210  int status;
    208211  struct sockaddr_in listen_addr;
    209212  struct sockaddr_in recv_addr;
    210   int listen_port = -1;
     213  int listen_port[MAX_SERVICES];
    211214  int recv_port = -1;
    212215  int connected_fds[10] = {0};
    213216  int subnet_addr;
     217  int n;
     218
     219  listen_port[0] = -1;
     220  server_command[0][0] = 0;
    214221
    215222  while(1) {
     
    231238        break;
    232239      case 'c':
    233         strncpy(server_command, optarg, sizeof(server_command));
     240        strncpy(server_command[nservices], optarg, sizeof(server_command[0]));
     241
     242                if (listen_port[nservices] == -1) {
     243                    fprintf(stderr,"Must specify -l port before each -c command.\n");
     244                        return 1;
     245                }
     246
     247                nservices++;
     248                listen_port[nservices] = -1;
    234249        break;
    235250      case 'l':
    236         listen_port = strtoul(optarg,0,0);
     251        listen_port[nservices] = strtoul(optarg,0,0);
    237252        break;
    238253      case 's':
     
    250265  }
    251266
    252   if (listen_port == -1 ||
     267  if (nservices == 0 ||
    253268      recv_port == -1 ||
    254269      subnet_addr == -1 ||
    255       server_command[0]=='\0') {
     270      server_command[0][0]=='\0') {
    256271    help(argv[0]);
    257272    return 1;
    258273  }
    259274
    260   // Parse the command arguments...
    261   command_argc=0;
    262   command_argv = malloc((command_argc+2) * sizeof(char *));
    263   command_argv[command_argc] = strtok(server_command, " \t");
    264   command_argc++;
    265   while( (command_argv[command_argc] = strtok(NULL, " \t"))) {
    266     command_argv = realloc(command_argv, (command_argc+2) * sizeof(char *));
    267     command_argc++;
    268   }
    269 
    270   // Create a socket for listening.
    271   listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    272   if (listen_fd < 0) {
    273     perror("socket");
    274     exit(1);
    275   }
    276 
    277   // If program is killed, drop the socket address reservation immediately.
    278   val = 1;
    279   status = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
    280   if (status < 0) {
    281     perror("setsockopt");
    282     // Not fatal.  Keep on going.
    283   }
    284 
    285   // Bind this address to the socket.
    286   listen_addr.sin_family = AF_INET;
    287   listen_addr.sin_port = htons(listen_port);
    288   listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    289   status = bind(listen_fd, (struct sockaddr *)&listen_addr,
     275  for(n = 0; n < nservices; n++) {
     276      // Parse the command arguments...
     277
     278      command_argc[n]=0;
     279      command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *));
     280      command_argv[n][command_argc[n]] = strtok(server_command[n], " \t");
     281      command_argc[n]++;
     282      while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) {
     283        command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *));
     284        command_argc[n]++;
     285      }
     286
     287      // Create a socket for listening.
     288      listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
     289      if (listen_fd[n] < 0) {
     290        perror("socket");
     291        exit(1);
     292      }
     293 
     294      // If program is killed, drop the socket address reservation immediately.
     295          val = 1;
     296      status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
     297      if (status < 0) {
     298        perror("setsockopt");
     299        // Not fatal.  Keep on going.
     300      }
     301
     302      // Bind this address to the socket.
     303      listen_addr.sin_family = AF_INET;
     304      listen_addr.sin_port = htons(listen_port[n]);
     305      listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
     306      status = bind(listen_fd[n], (struct sockaddr *)&listen_addr,
    290307                sizeof(listen_addr));
    291   if (status < 0) {
    292     perror("bind");
    293     exit(1);
    294   }
    295 
    296   // Listen on the specified port.
    297   status = listen(listen_fd,5);
    298   if (status < 0) {
    299     perror("listen");
     308      if (status < 0) {
     309        perror("bind");
     310        exit(1);
     311      }
     312
     313      // Listen on the specified port.
     314      status = listen(listen_fd[n],5);
     315      if (status < 0) {
     316        perror("listen");
     317      }
    300318  }
    301319
     
    338356  send_addr.sin_port = htons(recv_port);
    339357
    340   // Set up a signal handler for exiting children.
    341   // It doesn't do anything other than interrupt select() below.
    342   if (signal(SIGCHLD,sigchild_handler) == SIG_ERR) {
    343     perror("signal SIGCHLD");
    344   }
    345 
    346358  // Set up a signal handler for the alarm interrupt.
    347359  // It doesn't do anything other than interrupt select() below.
    348360  if (signal(SIGALRM,sigalarm_handler) == SIG_ERR) {
    349     perror("signal SIGCHLD");
     361    perror("signal SIGALRM");
    350362  }
    351363
     
    359371
    360372  // We're ready to go.  Before going into the main loop,
    361   // do a check_children to broadcast a load announcement to
    362   // other machines.
    363   check_children();
    364 
    365 
    366   int maxfd = max(listen_fd,send_fd);
     373  // broadcast a load announcement to other machines.
     374  broadcast_load();
     375
     376  int maxfd = send_fd;
    367377  FD_ZERO(&saved_rfds);
    368378  FD_ZERO(&pipe_rfds);
    369   FD_SET(listen_fd, &saved_rfds);
     379
     380  for(n = 0; n < nservices; n++) {
     381      FD_ZERO(&service_rfds[n]);
     382      FD_SET(listen_fd[n], &saved_rfds);
     383          if (listen_fd[n] > maxfd)
     384              maxfd = listen_fd[n];
     385  }
     386
    370387  FD_SET(send_fd, &saved_rfds);
    371388  while(1) {
     
    379396        sigalarm_set = 0;
    380397      }
    381       if (sigchild_set) {
    382         check_children();
    383         sigchild_set = 0;
    384       }
    385398      continue;
    386399    }
    387400
    388     if (FD_ISSET(listen_fd, &rfds)) {
    389       // Accept a new connection.
    390       struct sockaddr_in newaddr;
    391       unsigned int addrlen = sizeof(newaddr);
    392       int newfd = accept(listen_fd, (struct sockaddr *)&newaddr, &addrlen);
    393       if (newfd < 0) {
    394         perror("accept");
    395         continue;
    396       }
    397 
    398       printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr));
    399       FD_SET(newfd, &saved_rfds);
    400       maxfd = max(maxfd, newfd);
    401       continue;
    402     }
     401   
     402    int accepted = 0;
     403    for(n = 0; n < nservices; n++) {
     404        if (FD_ISSET(listen_fd[n], &rfds)) {
     405          // Accept a new connection.
     406          struct sockaddr_in newaddr;
     407          unsigned int addrlen = sizeof(newaddr);
     408          int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen);
     409          if (newfd < 0) {
     410            perror("accept");
     411            continue;
     412          }
     413
     414          printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr));
     415          FD_SET(newfd, &saved_rfds);
     416          maxfd = max(maxfd, newfd);
     417                  FD_SET(newfd, &service_rfds[n]);
     418          accepted = 1;
     419                }
     420        }
     421
     422        if (accepted)
     423            continue;
    403424
    404425    if (FD_ISSET(send_fd, &rfds)) {
     
    455476          status = read(i, &value, sizeof(value));
    456477          if (status != 4) {
    457             close(i);
     478                    //fprintf(stderr,"error reading pipe, child ended?\n");
     479            close_child(i);
     480                        /*close(i);
    458481            FD_CLR(i, &saved_rfds);
    459             FD_CLR(i, &pipe_rfds);
     482            FD_CLR(i, &pipe_rfds); */
    460483          } else {
    461484            note_request(i,value);
     
    471494          fprintf(stderr,"Bad status on read (%d).", status);
    472495          FD_CLR(i, &saved_rfds);
    473           close(i);
     496          clear_service_fd(i);
     497                  close(i);
    474498          continue;
    475499        }
     
    498522          write(i, &host_array[index].in_addr.s_addr, 4);
    499523          FD_CLR(i, &saved_rfds);
     524          clear_service_fd(i);
    500525          close(i);
    501526          continue;
     
    529554          perror("fork");
    530555        } else if (status == 0) {
    531           dup2(i,0);  // stdin
    532           dup2(i,1);  // stdout
    533           dup2(i,2);  // stderr
    534           dup2(pair[1],3);
    535           int fd;
    536           for(fd=4; fd<FD_SETSIZE; fd++) {
    537             close(fd);
    538           }
    539           execvp(command_argv[0], command_argv);
     556
     557                  for(n = 0; n < MAX_SERVICES; n++) {
     558                    if (FD_ISSET(i, &service_rfds[n])) {
     559
     560                          // disassociate
     561                          if ( daemon(0,1) == 0 ) {
     562                int fd;
     563
     564                dup2(i,0);  // stdin
     565                dup2(i,1);  // stdout
     566                dup2(i,2);  // stderr
     567                dup2(pair[1],3);
     568
     569                for(fd=4; fd<FD_SETSIZE; fd++)
     570                  close(fd);
     571
     572                execvp(command_argv[n][0], command_argv[n]);
     573                      }
     574                          _exit(errno);
     575                    }
     576                  }
     577                  _exit(EINVAL);
     578
    540579        } else {
    541580          int c;
     581                  // reap initial child which will exit immediately (grandchild continues)
     582                  waitpid(status, NULL, 0);
    542583          for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) {
    543             if (child_array[c].pid == 0) {
    544               child_array[c].pid = status;
     584            if (child_array[c].pipefd == 0) {
    545585              child_array[c].memory = newmemory;
    546586              child_array[c].pipefd = pair[0];
     
    558598
    559599          children++;
    560           check_children();
     600          broadcast_load();
    561601        }
    562602
    563603
    564604        FD_CLR(i, &saved_rfds);
     605        clear_service_fd(i);
    565606        close(i);
    566607        break;
Note: See TracChangeset for help on using the changeset viewer.