source: trunk/gui/vizservers/nanoscale/server.c @ 716

Last change on this file since 716 was 716, checked in by nkissebe, 17 years ago

daemonize by default, use -f (debug flag) to run in the foreground

File size: 16.2 KB
Line 
1#include <stdio.h>
2#include <unistd.h>
3#include <stdlib.h>
4#include <string.h>
5#include <signal.h>
6#include <sys/types.h>
7#include <sys/socket.h>
8#include <sys/wait.h>
9#include <sys/select.h>
10#include <sys/time.h>
11#include <fcntl.h>
12#include <netinet/in.h>
13#include <getopt.h>
14#include <errno.h>
15
16// The initial request load for a new renderer.
17#define INITIAL_LOAD 100000000.0
18
19// The factor that the load is divided by every second.
20#define LOAD_DROP_OFF 2.0
21
22// The broadcast interval (in seconds)
23#define BROADCAST_INTERVAL 5
24
25// The load of a remote machine must be less than this factor to
26// justify redirection.
27#define LOAD_REDIRECT_FACTOR 0.8
28
29// Maxium number of services we support
30#define MAX_SERVICES 100
31
32float load = 0;             // The present load average for this system.
33int memory_in_use = 0;      // Total memory in use by this system.
34int children = 0;           // Number of children running on this system.
35int send_fd;                // The file descriptor we broadcast through.
36struct sockaddr_in send_addr;  // The subnet address we broadcast to.
37fd_set saved_rfds;          // Descriptors we're reading from.
38fd_set pipe_rfds;           // Descriptors that are pipes to children.
39fd_set service_rfds[MAX_SERVICES];
40
41struct host_info {
42  struct in_addr in_addr;
43  float          load;
44  int            children;
45};
46
47struct child_info {
48  int   memory;
49  int   pipefd;
50  float requests;
51};
52
53struct host_info host_array[100];
54struct child_info child_array[100];
55
56
57
58
59/*
60 * min()/max() macros that also do
61 * strict type-checking.. See the
62 * "unnecessary" pointer comparison.
63 */
64#define min(x,y) ({ \
65        typeof(x) _x = (x);     \
66        typeof(y) _y = (y);     \
67        (void) (&_x == &_y);            \
68        _x < _y ? _x : _y; })
69
70#define max(x,y) ({ \
71        typeof(x) _x = (x);     \
72        typeof(y) _y = (y);     \
73        (void) (&_x == &_y);            \
74        _x > _y ? _x : _y; })
75
76int find_best_host(void)
77{
78  int h;
79  float best = load;
80  int index = -1;
81  //printf("My load is %f\n", best);
82  for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
83    if (host_array[h].in_addr.s_addr == 0)
84      continue;
85    //printf("%d I think load for %s is %f   ", h,
86    //       inet_ntoa(host_array[h].in_addr), host_array[h].load);
87    if (host_array[h].children <= children) {
88      if (host_array[h].load < best) {
89        //if ((random() % 100) < 75) {
90          index = h;
91          best = host_array[h].load;
92        //}
93        //printf(" Better\n");
94      } else {
95        //printf(" Worse\n");
96      }
97    }
98  }
99
100  //printf("I choose %d\n", index);
101  return index;
102}
103
104
105void broadcast_load(void)
106{
107  int msg[2];
108  msg[0] = htonl(load);
109  msg[1] = htonl(children);
110  int status;
111  status = sendto(send_fd, &msg, sizeof(msg), 0, (struct sockaddr *)&send_addr,
112                  sizeof(send_addr));
113  if (status < 0) {
114    perror("sendto");
115  }
116}
117
118close_child(int pipe_fd)
119{
120    int i;
121    for(i=0; i<sizeof(child_array)/sizeof(child_array[0]); i++) {
122      if (child_array[i].pipefd == pipe_fd) {
123        children--;
124        memory_in_use -= child_array[i].memory;
125        child_array[i].memory = 0;
126        FD_CLR(child_array[i].pipefd, &saved_rfds);
127        FD_CLR(child_array[i].pipefd, &pipe_rfds);
128        close(child_array[i].pipefd);
129        child_array[i].pipefd = 0;
130        break;
131          }
132        }
133 
134    printf("processes=%d, memory=%d, load=%f\n",
135         children, memory_in_use, load);
136
137    broadcast_load();
138}
139
140void note_request(int fd, float value)
141{
142  int c;
143  for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
144    if (child_array[c].pipefd == fd) {
145      child_array[c].requests += value;
146#ifdef DEBUGGING
147      printf("Updating requests from pipefd %d to %f\n",
148             child_array[c].pipefd,
149             child_array[c].requests);
150#endif
151      return;
152    }
153  }
154}
155
156void update_load_average(void)
157{
158  static unsigned int counter;
159
160  load = load / LOAD_DROP_OFF;
161  float newload = 0.0;
162  int c;
163  for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
164    if (child_array[c].pipefd != 0) {
165      newload += child_array[c].requests * child_array[c].memory;
166      child_array[c].requests = 0;
167    }
168  }
169  load = load + newload;
170
171  if ((counter++ % BROADCAST_INTERVAL) == 0) {
172    broadcast_load();
173  }
174}
175
176volatile int sigalarm_set;
177void sigalarm_handler(int signum)
178{
179  sigalarm_set = 1;
180}
181
182void help(const char *argv0)
183{
184  fprintf(stderr,
185          "Syntax: %s [-d] -b <broadcast port> -l <listen port> -s <subnet> -c 'command'\n",
186          argv0);
187  exit(1);
188}
189
190int
191clear_service_fd(int fd)
192{
193    int n;
194
195        for(n = 0; n < MAX_SERVICES; n++)
196        {
197            if (FD_ISSET(fd, &service_rfds[n]))
198                    FD_CLR(fd, &service_rfds[n]);
199        }
200}
201
202int main(int argc, char *argv[])
203{
204  char server_command[MAX_SERVICES][1000];
205  int nservices = 0;
206  int command_argc[MAX_SERVICES];
207  char **command_argv[MAX_SERVICES];
208  int val;
209  int listen_fd[MAX_SERVICES];
210  int status;
211  struct sockaddr_in listen_addr;
212  struct sockaddr_in recv_addr;
213  int listen_port[MAX_SERVICES];
214  int recv_port = -1;
215  int connected_fds[10] = {0};
216  int subnet_addr;
217  int debug_flag = 0;
218  int n;
219
220  listen_port[0] = -1;
221  server_command[0][0] = 0;
222
223  while(1) {
224    int c;
225    int this_option_optind = optind ? optind : 1;
226    int option_index = 0;
227    struct option long_options[] = {
228      // name, has_arg, flag, val
229      { 0,0,0,0 },
230    };
231
232    c = getopt_long(argc, argv, "+b:c:l:s:d", long_options, &option_index);
233    if (c == -1)
234      break;
235
236    switch(c) {
237          case 'd':
238            debug_flag = 1;
239                break;
240      case 'b':
241        recv_port = strtoul(optarg,0,0);
242        break;
243      case 'c':
244        strncpy(server_command[nservices], optarg, sizeof(server_command[0]));
245
246                if (listen_port[nservices] == -1) {
247                    fprintf(stderr,"Must specify -l port before each -c command.\n");
248                        return 1;
249                }
250
251                nservices++;
252                listen_port[nservices] = -1;
253        break;
254      case 'l':
255        listen_port[nservices] = strtoul(optarg,0,0);
256        break;
257      case 's':
258        send_addr.sin_addr.s_addr = htonl(inet_network(optarg,
259                                                       &send_addr.sin_addr));
260        if (send_addr.sin_addr.s_addr == -1) {
261          fprintf(stderr,"Invalid subnet broadcast address");
262          return 1;
263        }
264        break;
265      default:
266        fprintf(stderr,"Don't know what option '%c'.\n", c);
267        return 1;
268    }
269  }
270
271  if (nservices == 0 ||
272      recv_port == -1 ||
273      subnet_addr == -1 ||
274      server_command[0][0]=='\0') {
275    help(argv[0]);
276    return 1;
277  }
278
279  for(n = 0; n < nservices; n++) {
280      // Parse the command arguments...
281
282      command_argc[n]=0;
283      command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *));
284      command_argv[n][command_argc[n]] = strtok(server_command[n], " \t");
285      command_argc[n]++;
286      while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) {
287        command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *));
288        command_argc[n]++;
289      }
290
291      // Create a socket for listening.
292      listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
293      if (listen_fd[n] < 0) {
294        perror("socket");
295        exit(1);
296      }
297 
298      // If program is killed, drop the socket address reservation immediately.
299          val = 1;
300      status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
301      if (status < 0) {
302        perror("setsockopt");
303        // Not fatal.  Keep on going.
304      }
305
306      // Bind this address to the socket.
307      listen_addr.sin_family = AF_INET;
308      listen_addr.sin_port = htons(listen_port[n]);
309      listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
310      status = bind(listen_fd[n], (struct sockaddr *)&listen_addr,
311                sizeof(listen_addr));
312      if (status < 0) {
313        perror("bind");
314        exit(1);
315      }
316
317      // Listen on the specified port.
318      status = listen(listen_fd[n],5);
319      if (status < 0) {
320        perror("listen");
321      }
322  }
323
324  // Create a socket for broadcast.
325  send_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
326  if (send_fd < 0) {
327    perror("socket");
328    exit(1);
329  }
330
331  // If program is killed, drop the socket address reservation immediately.
332  val = 1;
333  status = setsockopt(send_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
334  if (status < 0) {
335    perror("setsockopt");
336    // Not fatal.  Keep on going.
337  }
338
339  // We're going to broadcast through this socket.
340  val = 1;
341  status = setsockopt(send_fd, SOL_SOCKET, SO_BROADCAST, &val, sizeof(val));
342  if (status < 0) {
343    perror("setsockopt");
344    // Not fatal.  Keep on going.
345  }
346
347  // Bind this address to the socket.
348  recv_addr.sin_family = AF_INET;
349  recv_addr.sin_port = htons(recv_port);
350  recv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
351  status = bind(send_fd, (struct sockaddr *)&recv_addr,
352                sizeof(recv_addr));
353  if (status < 0) {
354    perror("bind");
355    exit(1);
356  }
357
358  // Set up the address that we broadcast to.
359  send_addr.sin_family = AF_INET;
360  send_addr.sin_port = htons(recv_port);
361
362  // Set up a signal handler for the alarm interrupt.
363  // It doesn't do anything other than interrupt select() below.
364  if (signal(SIGALRM,sigalarm_handler) == SIG_ERR) {
365    perror("signal SIGALRM");
366  }
367
368  struct itimerval itvalue = {
369    {1, 0}, {1, 0}
370  };
371  status = setitimer(ITIMER_REAL, &itvalue, NULL);
372  if (status != 0) {
373    perror("setitimer");
374  }
375
376  // We're ready to go.  Before going into the main loop,
377  // broadcast a load announcement to other machines.
378  broadcast_load();
379
380  int maxfd = send_fd;
381  FD_ZERO(&saved_rfds);
382  FD_ZERO(&pipe_rfds);
383
384  for(n = 0; n < nservices; n++) {
385      FD_ZERO(&service_rfds[n]);
386      FD_SET(listen_fd[n], &saved_rfds);
387          if (listen_fd[n] > maxfd)
388              maxfd = listen_fd[n];
389  }
390
391  FD_SET(send_fd, &saved_rfds);
392
393  if (debug_flag == 0) {
394      if ( daemon(0,0) != 0 ) {
395              perror("daemon");
396                  exit(1);
397          }
398  }
399
400  while(1) {
401
402    fd_set rfds = saved_rfds;
403
404    status = select(maxfd+1, &rfds, NULL, NULL, 0);
405    if (status <= 0) {
406      if (sigalarm_set) {
407        update_load_average();
408        sigalarm_set = 0;
409      }
410      continue;
411    }
412
413   
414    int accepted = 0;
415    for(n = 0; n < nservices; n++) {
416        if (FD_ISSET(listen_fd[n], &rfds)) {
417          // Accept a new connection.
418          struct sockaddr_in newaddr;
419          unsigned int addrlen = sizeof(newaddr);
420          int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen);
421          if (newfd < 0) {
422            perror("accept");
423            continue;
424          }
425
426          printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr));
427          FD_SET(newfd, &saved_rfds);
428          maxfd = max(maxfd, newfd);
429                  FD_SET(newfd, &service_rfds[n]);
430          accepted = 1;
431                }
432        }
433
434        if (accepted)
435            continue;
436
437    if (FD_ISSET(send_fd, &rfds)) {
438      int buffer[1000];
439      struct sockaddr_in peer_addr;
440      unsigned int len = sizeof(peer_addr);
441      status = recvfrom(send_fd, buffer, sizeof(buffer), 0,
442                        (struct sockaddr*)&peer_addr, &len);
443      if (status < 0) {
444        perror("recvfrom");
445        continue;
446      }
447      if (status != 8) {
448        fprintf(stderr,"Bogus message from %s\n",
449                inet_ntoa(peer_addr.sin_addr));
450        continue;
451      }
452      float peer_load = ntohl(buffer[0]);
453      int peer_procs = ntohl(buffer[1]);
454      //printf("Load for %s is %f (%d processes).\n",
455      //       inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
456      int h;
457      int free_index=-1;
458      int found = 0;
459      for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
460        if (host_array[h].in_addr.s_addr == peer_addr.sin_addr.s_addr) {
461          if (host_array[h].children != peer_procs) {
462            printf("Load for %s is %f (%d processes).\n",
463                   inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
464          }
465          host_array[h].load = peer_load;
466          host_array[h].children = peer_procs;
467          found = 1;
468          break;
469        }
470        if (host_array[h].in_addr.s_addr == 0 && free_index == -1) {
471          free_index = h;
472        }
473      }
474      if (!found) {
475        host_array[free_index].in_addr.s_addr = peer_addr.sin_addr.s_addr;
476        host_array[free_index].load = peer_load;
477      }
478      continue;
479    }
480
481    int i;
482    for(i=0; i<maxfd+1; i++) {
483      if (FD_ISSET(i,&rfds)) {
484
485        // If this is a pipe, get the load.  Update.
486        if (FD_ISSET(i,&pipe_rfds)) {
487          float value;
488          status = read(i, &value, sizeof(value));
489          if (status != 4) {
490                    //fprintf(stderr,"error reading pipe, child ended?\n");
491            close_child(i);
492                        /*close(i);
493            FD_CLR(i, &saved_rfds);
494            FD_CLR(i, &pipe_rfds); */
495          } else {
496            note_request(i,value);
497          }
498          continue;
499        }
500
501        // This must be a descriptor that we're waiting to from
502        // for the memory footprint.  Get it.
503        int msg;
504        status = read(i, &msg, 4);
505        if (status != 4) {
506          fprintf(stderr,"Bad status on read (%d).", status);
507          FD_CLR(i, &saved_rfds);
508          clear_service_fd(i);
509                  close(i);
510          continue;
511        }
512
513        // find the new memory increment
514        int newmemory = ntohl(msg);
515
516        // Find the best host to create a new child on.
517        int index = find_best_host();
518
519        // Only redirect if another host's load is significantly less
520        // than our own...
521        if (index != -1 &&
522            (host_array[index].load < (LOAD_REDIRECT_FACTOR * load))) {
523
524          // If we're redirecting to another machine, give that machine
525          // an extra boost in our copy of the load statistics.  This will
526          // keep us from sending the very next job to it.  Eventually, the
527          // other machine will broadcast its real load and we can make an
528          // informed decision as to who redirect to again.
529          host_array[index].load += newmemory * INITIAL_LOAD;
530
531          // Redirect to another machine.
532          printf("Redirecting to %s\n",
533                 inet_ntoa(host_array[index].in_addr));
534          write(i, &host_array[index].in_addr.s_addr, 4);
535          FD_CLR(i, &saved_rfds);
536          clear_service_fd(i);
537          close(i);
538          continue;
539        }
540
541        memory_in_use += newmemory;
542        load += 2*INITIAL_LOAD;
543        broadcast_load();
544        printf("Accepted new job with memory %d\n", newmemory);
545        //printf("My load is now %f\n", load);
546
547        // accept the connection.
548        msg = 0;
549        write(i, &msg, 4);
550
551        int pair[2];
552        status = pipe(pair);
553        if (status != 0) {
554          perror("pipe");
555        }
556
557        // Make the child side of the pipe non-blocking...
558        status = fcntl(pair[1], F_SETFL, O_NONBLOCK);
559        if (status < 0) {
560          perror("fcntl");
561        }
562
563        // Fork the new process.  Connect i/o to the new socket.
564        status = fork();
565        if (status < 0) {
566          perror("fork");
567        } else if (status == 0) {
568
569                  for(n = 0; n < MAX_SERVICES; n++) {
570                    if (FD_ISSET(i, &service_rfds[n])) {
571
572                          // disassociate
573                          if ( daemon(0,1) == 0 ) {
574                int fd;
575
576                dup2(i,0);  // stdin
577                dup2(i,1);  // stdout
578                dup2(i,2);  // stderr
579                dup2(pair[1],3);
580
581                for(fd=4; fd<FD_SETSIZE; fd++)
582                  close(fd);
583
584                execvp(command_argv[n][0], command_argv[n]);
585                      }
586                          _exit(errno);
587                    }
588                  }
589                  _exit(EINVAL);
590
591        } else {
592          int c;
593                  // reap initial child which will exit immediately (grandchild continues)
594                  waitpid(status, NULL, 0);
595          for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) {
596            if (child_array[c].pipefd == 0) {
597              child_array[c].memory = newmemory;
598              child_array[c].pipefd = pair[0];
599              child_array[c].requests = INITIAL_LOAD;
600              status = close(pair[1]);
601              if (status != 0) {
602                perror("close pair[1]");
603              }
604              FD_SET(pair[0], &saved_rfds);
605              FD_SET(pair[0], &pipe_rfds);
606              maxfd = max(pair[0], maxfd);
607              break;
608            }
609          }
610
611          children++;
612          broadcast_load();
613        }
614
615
616        FD_CLR(i, &saved_rfds);
617        clear_service_fd(i);
618        close(i);
619        break;
620      }
621
622    } // for all connected_fds
623
624  } // while(1)
625
626}
627
Note: See TracBrowser for help on using the repository browser.