source: trunk/vizservers/nanoscale/server.c @ 952

Last change on this file since 952 was 722, checked in by nkissebe, 16 years ago

leave read end of request pipe open when launching client to prevent
SIGPIPE condition when nanoscale terminates

File size: 16.3 KB
Line 
1#include <stdio.h>
2#include <unistd.h>
3#include <stdlib.h>
4#include <string.h>
5#include <signal.h>
6#include <sys/types.h>
7#include <sys/socket.h>
8#include <sys/wait.h>
9#include <sys/select.h>
10#include <sys/time.h>
11#include <fcntl.h>
12#include <netinet/in.h>
13#include <getopt.h>
14#include <errno.h>
15
16// The initial request load for a new renderer.
17#define INITIAL_LOAD 100000000.0
18
19// The factor that the load is divided by every second.
20#define LOAD_DROP_OFF 2.0
21
22// The broadcast interval (in seconds)
23#define BROADCAST_INTERVAL 5
24
25// The load of a remote machine must be less than this factor to
26// justify redirection.
27#define LOAD_REDIRECT_FACTOR 0.8
28
29// Maxium number of services we support
30#define MAX_SERVICES 100
31
32float load = 0;             // The present load average for this system.
33int memory_in_use = 0;      // Total memory in use by this system.
34int children = 0;           // Number of children running on this system.
35int send_fd;                // The file descriptor we broadcast through.
36struct sockaddr_in send_addr;  // The subnet address we broadcast to.
37fd_set saved_rfds;          // Descriptors we're reading from.
38fd_set pipe_rfds;           // Descriptors that are pipes to children.
39fd_set service_rfds[MAX_SERVICES];
40
41struct host_info {
42  struct in_addr in_addr;
43  float          load;
44  int            children;
45};
46
47struct child_info {
48  int   memory;
49  int   pipefd;
50  float requests;
51};
52
53struct host_info host_array[100];
54struct child_info child_array[100];
55
56
57
58
59/*
60 * min()/max() macros that also do
61 * strict type-checking.. See the
62 * "unnecessary" pointer comparison.
63 */
64#define min(x,y) ({ \
65        typeof(x) _x = (x);     \
66        typeof(y) _y = (y);     \
67        (void) (&_x == &_y);            \
68        _x < _y ? _x : _y; })
69
70#define max(x,y) ({ \
71        typeof(x) _x = (x);     \
72        typeof(y) _y = (y);     \
73        (void) (&_x == &_y);            \
74        _x > _y ? _x : _y; })
75
76int find_best_host(void)
77{
78  int h;
79  float best = load;
80  int index = -1;
81  //printf("My load is %f\n", best);
82  for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
83    if (host_array[h].in_addr.s_addr == 0)
84      continue;
85    //printf("%d I think load for %s is %f   ", h,
86    //       inet_ntoa(host_array[h].in_addr), host_array[h].load);
87    if (host_array[h].children <= children) {
88      if (host_array[h].load < best) {
89        //if ((random() % 100) < 75) {
90          index = h;
91          best = host_array[h].load;
92        //}
93        //printf(" Better\n");
94      } else {
95        //printf(" Worse\n");
96      }
97    }
98  }
99
100  //printf("I choose %d\n", index);
101  return index;
102}
103
104
105void broadcast_load(void)
106{
107  int msg[2];
108  msg[0] = htonl(load);
109  msg[1] = htonl(children);
110  int status;
111  status = sendto(send_fd, &msg, sizeof(msg), 0, (struct sockaddr *)&send_addr,
112                  sizeof(send_addr));
113  if (status < 0) {
114    perror("sendto");
115  }
116}
117
118close_child(int pipe_fd)
119{
120    int i;
121    for(i=0; i<sizeof(child_array)/sizeof(child_array[0]); i++) {
122      if (child_array[i].pipefd == pipe_fd) {
123        children--;
124        memory_in_use -= child_array[i].memory;
125        child_array[i].memory = 0;
126        FD_CLR(child_array[i].pipefd, &saved_rfds);
127        FD_CLR(child_array[i].pipefd, &pipe_rfds);
128        close(child_array[i].pipefd);
129        child_array[i].pipefd = 0;
130        break;
131          }
132        }
133 
134    printf("processes=%d, memory=%d, load=%f\n",
135         children, memory_in_use, load);
136
137    broadcast_load();
138}
139
140void note_request(int fd, float value)
141{
142  int c;
143  for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
144    if (child_array[c].pipefd == fd) {
145      child_array[c].requests += value;
146#ifdef DEBUGGING
147      printf("Updating requests from pipefd %d to %f\n",
148             child_array[c].pipefd,
149             child_array[c].requests);
150#endif
151      return;
152    }
153  }
154}
155
156void update_load_average(void)
157{
158  static unsigned int counter;
159
160  load = load / LOAD_DROP_OFF;
161  float newload = 0.0;
162  int c;
163  for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
164    if (child_array[c].pipefd != 0) {
165      newload += child_array[c].requests * child_array[c].memory;
166      child_array[c].requests = 0;
167    }
168  }
169  load = load + newload;
170
171  if ((counter++ % BROADCAST_INTERVAL) == 0) {
172    broadcast_load();
173  }
174}
175
176volatile int sigalarm_set;
177void sigalarm_handler(int signum)
178{
179  sigalarm_set = 1;
180}
181
182void help(const char *argv0)
183{
184  fprintf(stderr,
185          "Syntax: %s [-d] -b <broadcast port> -l <listen port> -s <subnet> -c 'command'\n",
186          argv0);
187  exit(1);
188}
189
190int
191clear_service_fd(int fd)
192{
193    int n;
194
195        for(n = 0; n < MAX_SERVICES; n++)
196        {
197            if (FD_ISSET(fd, &service_rfds[n]))
198                    FD_CLR(fd, &service_rfds[n]);
199        }
200}
201
202int main(int argc, char *argv[])
203{
204  char server_command[MAX_SERVICES][1000];
205  int nservices = 0;
206  int command_argc[MAX_SERVICES];
207  char **command_argv[MAX_SERVICES];
208  int val;
209  int listen_fd[MAX_SERVICES];
210  int status;
211  struct sockaddr_in listen_addr;
212  struct sockaddr_in recv_addr;
213  int listen_port[MAX_SERVICES];
214  int recv_port = -1;
215  int connected_fds[10] = {0};
216  int subnet_addr;
217  int debug_flag = 0;
218  int n;
219
220  listen_port[0] = -1;
221  server_command[0][0] = 0;
222
223  while(1) {
224    int c;
225    int this_option_optind = optind ? optind : 1;
226    int option_index = 0;
227    struct option long_options[] = {
228      // name, has_arg, flag, val
229      { 0,0,0,0 },
230    };
231
232    c = getopt_long(argc, argv, "+b:c:l:s:d", long_options, &option_index);
233    if (c == -1)
234      break;
235
236    switch(c) {
237          case 'd':
238            debug_flag = 1;
239                break;
240      case 'b':
241        recv_port = strtoul(optarg,0,0);
242        break;
243      case 'c':
244        strncpy(server_command[nservices], optarg, sizeof(server_command[0]));
245
246                if (listen_port[nservices] == -1) {
247                    fprintf(stderr,"Must specify -l port before each -c command.\n");
248                        return 1;
249                }
250
251                nservices++;
252                listen_port[nservices] = -1;
253        break;
254      case 'l':
255        listen_port[nservices] = strtoul(optarg,0,0);
256        break;
257      case 's':
258        send_addr.sin_addr.s_addr = htonl(inet_network(optarg,
259                                                       &send_addr.sin_addr));
260        if (send_addr.sin_addr.s_addr == -1) {
261          fprintf(stderr,"Invalid subnet broadcast address");
262          return 1;
263        }
264        break;
265      default:
266        fprintf(stderr,"Don't know what option '%c'.\n", c);
267        return 1;
268    }
269  }
270
271  if (nservices == 0 ||
272      recv_port == -1 ||
273      subnet_addr == -1 ||
274      server_command[0][0]=='\0') {
275    help(argv[0]);
276    return 1;
277  }
278
279  for(n = 0; n < nservices; n++) {
280      // Parse the command arguments...
281
282      command_argc[n]=0;
283      command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *));
284      command_argv[n][command_argc[n]] = strtok(server_command[n], " \t");
285      command_argc[n]++;
286      while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) {
287        command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *));
288        command_argc[n]++;
289      }
290
291      // Create a socket for listening.
292      listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
293      if (listen_fd[n] < 0) {
294        perror("socket");
295        exit(1);
296      }
297 
298      // If program is killed, drop the socket address reservation immediately.
299          val = 1;
300      status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
301      if (status < 0) {
302        perror("setsockopt");
303        // Not fatal.  Keep on going.
304      }
305
306      // Bind this address to the socket.
307      listen_addr.sin_family = AF_INET;
308      listen_addr.sin_port = htons(listen_port[n]);
309      listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
310      status = bind(listen_fd[n], (struct sockaddr *)&listen_addr,
311                sizeof(listen_addr));
312      if (status < 0) {
313        perror("bind");
314        exit(1);
315      }
316
317      // Listen on the specified port.
318      status = listen(listen_fd[n],5);
319      if (status < 0) {
320        perror("listen");
321      }
322  }
323
324  // Create a socket for broadcast.
325  send_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
326  if (send_fd < 0) {
327    perror("socket");
328    exit(1);
329  }
330
331  // If program is killed, drop the socket address reservation immediately.
332  val = 1;
333  status = setsockopt(send_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
334  if (status < 0) {
335    perror("setsockopt");
336    // Not fatal.  Keep on going.
337  }
338
339  // We're going to broadcast through this socket.
340  val = 1;
341  status = setsockopt(send_fd, SOL_SOCKET, SO_BROADCAST, &val, sizeof(val));
342  if (status < 0) {
343    perror("setsockopt");
344    // Not fatal.  Keep on going.
345  }
346
347  // Bind this address to the socket.
348  recv_addr.sin_family = AF_INET;
349  recv_addr.sin_port = htons(recv_port);
350  recv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
351  status = bind(send_fd, (struct sockaddr *)&recv_addr,
352                sizeof(recv_addr));
353  if (status < 0) {
354    perror("bind");
355    exit(1);
356  }
357
358  // Set up the address that we broadcast to.
359  send_addr.sin_family = AF_INET;
360  send_addr.sin_port = htons(recv_port);
361
362  // Set up a signal handler for the alarm interrupt.
363  // It doesn't do anything other than interrupt select() below.
364  if (signal(SIGALRM,sigalarm_handler) == SIG_ERR) {
365    perror("signal SIGALRM");
366  }
367
368  struct itimerval itvalue = {
369    {1, 0}, {1, 0}
370  };
371  status = setitimer(ITIMER_REAL, &itvalue, NULL);
372  if (status != 0) {
373    perror("setitimer");
374  }
375
376  // We're ready to go.  Before going into the main loop,
377  // broadcast a load announcement to other machines.
378  broadcast_load();
379
380  int maxfd = send_fd;
381  FD_ZERO(&saved_rfds);
382  FD_ZERO(&pipe_rfds);
383
384  for(n = 0; n < nservices; n++) {
385      FD_ZERO(&service_rfds[n]);
386      FD_SET(listen_fd[n], &saved_rfds);
387          if (listen_fd[n] > maxfd)
388              maxfd = listen_fd[n];
389  }
390
391  FD_SET(send_fd, &saved_rfds);
392
393  if (debug_flag == 0) {
394      if ( daemon(0,0) != 0 ) {
395              perror("daemon");
396                  exit(1);
397          }
398  }
399
400  while(1) {
401
402    fd_set rfds = saved_rfds;
403
404    status = select(maxfd+1, &rfds, NULL, NULL, 0);
405    if (status <= 0) {
406      if (sigalarm_set) {
407        update_load_average();
408        sigalarm_set = 0;
409      }
410      continue;
411    }
412
413   
414    int accepted = 0;
415    for(n = 0; n < nservices; n++) {
416        if (FD_ISSET(listen_fd[n], &rfds)) {
417          // Accept a new connection.
418          struct sockaddr_in newaddr;
419          unsigned int addrlen = sizeof(newaddr);
420          int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen);
421          if (newfd < 0) {
422            perror("accept");
423            continue;
424          }
425
426          printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr));
427          FD_SET(newfd, &saved_rfds);
428          maxfd = max(maxfd, newfd);
429                  FD_SET(newfd, &service_rfds[n]);
430          accepted = 1;
431                }
432        }
433
434        if (accepted)
435            continue;
436
437    if (FD_ISSET(send_fd, &rfds)) {
438      int buffer[1000];
439      struct sockaddr_in peer_addr;
440      unsigned int len = sizeof(peer_addr);
441      status = recvfrom(send_fd, buffer, sizeof(buffer), 0,
442                        (struct sockaddr*)&peer_addr, &len);
443      if (status < 0) {
444        perror("recvfrom");
445        continue;
446      }
447      if (status != 8) {
448        fprintf(stderr,"Bogus message from %s\n",
449                inet_ntoa(peer_addr.sin_addr));
450        continue;
451      }
452      float peer_load = ntohl(buffer[0]);
453      int peer_procs = ntohl(buffer[1]);
454      //printf("Load for %s is %f (%d processes).\n",
455      //       inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
456      int h;
457      int free_index=-1;
458      int found = 0;
459      for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
460        if (host_array[h].in_addr.s_addr == peer_addr.sin_addr.s_addr) {
461          if (host_array[h].children != peer_procs) {
462            printf("Load for %s is %f (%d processes).\n",
463                   inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
464          }
465          host_array[h].load = peer_load;
466          host_array[h].children = peer_procs;
467          found = 1;
468          break;
469        }
470        if (host_array[h].in_addr.s_addr == 0 && free_index == -1) {
471          free_index = h;
472        }
473      }
474      if (!found) {
475        host_array[free_index].in_addr.s_addr = peer_addr.sin_addr.s_addr;
476        host_array[free_index].load = peer_load;
477      }
478      continue;
479    }
480
481    int i;
482    for(i=0; i<maxfd+1; i++) {
483      if (FD_ISSET(i,&rfds)) {
484
485        // If this is a pipe, get the load.  Update.
486        if (FD_ISSET(i,&pipe_rfds)) {
487          float value;
488          status = read(i, &value, sizeof(value));
489          if (status != 4) {
490                    //fprintf(stderr,"error reading pipe, child ended?\n");
491            close_child(i);
492                        /*close(i);
493            FD_CLR(i, &saved_rfds);
494            FD_CLR(i, &pipe_rfds); */
495          } else {
496            note_request(i,value);
497          }
498          continue;
499        }
500
501        // This must be a descriptor that we're waiting to from
502        // for the memory footprint.  Get it.
503        int msg;
504        status = read(i, &msg, 4);
505        if (status != 4) {
506          fprintf(stderr,"Bad status on read (%d).", status);
507          FD_CLR(i, &saved_rfds);
508          clear_service_fd(i);
509                  close(i);
510          continue;
511        }
512
513        // find the new memory increment
514        int newmemory = ntohl(msg);
515
516        // Find the best host to create a new child on.
517        int index = find_best_host();
518
519        // Only redirect if another host's load is significantly less
520        // than our own...
521        if (index != -1 &&
522            (host_array[index].load < (LOAD_REDIRECT_FACTOR * load))) {
523
524          // If we're redirecting to another machine, give that machine
525          // an extra boost in our copy of the load statistics.  This will
526          // keep us from sending the very next job to it.  Eventually, the
527          // other machine will broadcast its real load and we can make an
528          // informed decision as to who redirect to again.
529          host_array[index].load += newmemory * INITIAL_LOAD;
530
531          // Redirect to another machine.
532          printf("Redirecting to %s\n",
533                 inet_ntoa(host_array[index].in_addr));
534          write(i, &host_array[index].in_addr.s_addr, 4);
535          FD_CLR(i, &saved_rfds);
536          clear_service_fd(i);
537          close(i);
538          continue;
539        }
540
541        memory_in_use += newmemory;
542        load += 2*INITIAL_LOAD;
543        broadcast_load();
544        printf("Accepted new job with memory %d\n", newmemory);
545        //printf("My load is now %f\n", load);
546
547        // accept the connection.
548        msg = 0;
549        write(i, &msg, 4);
550
551        int pair[2];
552        status = pipe(pair);
553        if (status != 0) {
554          perror("pipe");
555        }
556
557        // Make the child side of the pipe non-blocking...
558        status = fcntl(pair[1], F_SETFL, O_NONBLOCK);
559        if (status < 0) {
560          perror("fcntl");
561        }
562
563        // Fork the new process.  Connect i/o to the new socket.
564        status = fork();
565        if (status < 0) {
566          perror("fork");
567        } else if (status == 0) {
568
569                  for(n = 0; n < MAX_SERVICES; n++) {
570                    if (FD_ISSET(i, &service_rfds[n])) {
571
572                          // disassociate
573                          if ( daemon(0,1) == 0 ) {
574                int fd;
575
576                dup2(i,0);  // stdin
577                dup2(i,1);  // stdout
578                dup2(i,2);  // stderr
579                dup2(pair[1],3);
580                // read end of pipe moved, and left open to prevent SIGPIPE
581                dup2(pair[0],4);
582
583                for(fd=5; fd<FD_SETSIZE; fd++)
584                  close(fd);
585
586                execvp(command_argv[n][0], command_argv[n]);
587                      }
588                          _exit(errno);
589                    }
590                  }
591                  _exit(EINVAL);
592
593        } else {
594          int c;
595                  // reap initial child which will exit immediately (grandchild continues)
596                  waitpid(status, NULL, 0);
597          for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) {
598            if (child_array[c].pipefd == 0) {
599              child_array[c].memory = newmemory;
600              child_array[c].pipefd = pair[0];
601              child_array[c].requests = INITIAL_LOAD;
602              status = close(pair[1]);
603              if (status != 0) {
604                perror("close pair[1]");
605              }
606              FD_SET(pair[0], &saved_rfds);
607              FD_SET(pair[0], &pipe_rfds);
608              maxfd = max(pair[0], maxfd);
609              break;
610            }
611          }
612
613          children++;
614          broadcast_load();
615        }
616
617
618        FD_CLR(i, &saved_rfds);
619        clear_service_fd(i);
620        close(i);
621        break;
622      }
623
624    } // for all connected_fds
625
626  } // while(1)
627
628}
629
Note: See TracBrowser for help on using the repository browser.