source: trunk/gui/vizservers/nanoscale/server.c @ 715

Last change on this file since 715 was 715, checked in by nkissebe, 17 years ago

multi-service and detachment support added to nanoscale

File size: 16.0 KB
Line 
1#include <stdio.h>
2#include <unistd.h>
3#include <stdlib.h>
4#include <string.h>
5#include <signal.h>
6#include <sys/types.h>
7#include <sys/socket.h>
8#include <sys/wait.h>
9#include <sys/select.h>
10#include <sys/time.h>
11#include <fcntl.h>
12#include <netinet/in.h>
13#include <getopt.h>
14#include <errno.h>
15
16// The initial request load for a new renderer.
17#define INITIAL_LOAD 100000000.0
18
19// The factor that the load is divided by every second.
20#define LOAD_DROP_OFF 2.0
21
22// The broadcast interval (in seconds)
23#define BROADCAST_INTERVAL 5
24
25// The load of a remote machine must be less than this factor to
26// justify redirection.
27#define LOAD_REDIRECT_FACTOR 0.8
28
29// Maxium number of services we support
30#define MAX_SERVICES 100
31
32float load = 0;             // The present load average for this system.
33int memory_in_use = 0;      // Total memory in use by this system.
34int children = 0;           // Number of children running on this system.
35int send_fd;                // The file descriptor we broadcast through.
36struct sockaddr_in send_addr;  // The subnet address we broadcast to.
37fd_set saved_rfds;          // Descriptors we're reading from.
38fd_set pipe_rfds;           // Descriptors that are pipes to children.
39fd_set service_rfds[MAX_SERVICES];
40
41struct host_info {
42  struct in_addr in_addr;
43  float          load;
44  int            children;
45};
46
47struct child_info {
48  int   memory;
49  int   pipefd;
50  float requests;
51};
52
53struct host_info host_array[100];
54struct child_info child_array[100];
55
56
57
58
59/*
60 * min()/max() macros that also do
61 * strict type-checking.. See the
62 * "unnecessary" pointer comparison.
63 */
64#define min(x,y) ({ \
65        typeof(x) _x = (x);     \
66        typeof(y) _y = (y);     \
67        (void) (&_x == &_y);            \
68        _x < _y ? _x : _y; })
69
70#define max(x,y) ({ \
71        typeof(x) _x = (x);     \
72        typeof(y) _y = (y);     \
73        (void) (&_x == &_y);            \
74        _x > _y ? _x : _y; })
75
76int find_best_host(void)
77{
78  int h;
79  float best = load;
80  int index = -1;
81  //printf("My load is %f\n", best);
82  for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
83    if (host_array[h].in_addr.s_addr == 0)
84      continue;
85    //printf("%d I think load for %s is %f   ", h,
86    //       inet_ntoa(host_array[h].in_addr), host_array[h].load);
87    if (host_array[h].children <= children) {
88      if (host_array[h].load < best) {
89        //if ((random() % 100) < 75) {
90          index = h;
91          best = host_array[h].load;
92        //}
93        //printf(" Better\n");
94      } else {
95        //printf(" Worse\n");
96      }
97    }
98  }
99
100  //printf("I choose %d\n", index);
101  return index;
102}
103
104
105void broadcast_load(void)
106{
107  int msg[2];
108  msg[0] = htonl(load);
109  msg[1] = htonl(children);
110  int status;
111  status = sendto(send_fd, &msg, sizeof(msg), 0, (struct sockaddr *)&send_addr,
112                  sizeof(send_addr));
113  if (status < 0) {
114    perror("sendto");
115  }
116}
117
118close_child(int pipe_fd)
119{
120    int i;
121    for(i=0; i<sizeof(child_array)/sizeof(child_array[0]); i++) {
122      if (child_array[i].pipefd == pipe_fd) {
123        children--;
124        memory_in_use -= child_array[i].memory;
125        child_array[i].memory = 0;
126        FD_CLR(child_array[i].pipefd, &saved_rfds);
127        FD_CLR(child_array[i].pipefd, &pipe_rfds);
128        close(child_array[i].pipefd);
129        child_array[i].pipefd = 0;
130        break;
131          }
132        }
133 
134    printf("processes=%d, memory=%d, load=%f\n",
135         children, memory_in_use, load);
136
137    broadcast_load();
138}
139
140void note_request(int fd, float value)
141{
142  int c;
143  for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
144    if (child_array[c].pipefd == fd) {
145      child_array[c].requests += value;
146#ifdef DEBUGGING
147      printf("Updating requests from pipefd %d to %f\n",
148             child_array[c].pipefd,
149             child_array[c].requests);
150#endif
151      return;
152    }
153  }
154}
155
156void update_load_average(void)
157{
158  static unsigned int counter;
159
160  load = load / LOAD_DROP_OFF;
161  float newload = 0.0;
162  int c;
163  for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
164    if (child_array[c].pipefd != 0) {
165      newload += child_array[c].requests * child_array[c].memory;
166      child_array[c].requests = 0;
167    }
168  }
169  load = load + newload;
170
171  if ((counter++ % BROADCAST_INTERVAL) == 0) {
172    broadcast_load();
173  }
174}
175
176volatile int sigalarm_set;
177void sigalarm_handler(int signum)
178{
179  sigalarm_set = 1;
180}
181
182void help(const char *argv0)
183{
184  fprintf(stderr,
185          "Syntax: %s -b <broadcast port> -l <listen port> -s <subnet> -c 'command'\n",
186          argv0);
187  exit(1);
188}
189
190int
191clear_service_fd(int fd)
192{
193    int n;
194
195        for(n = 0; n < MAX_SERVICES; n++)
196        {
197            if (FD_ISSET(fd, &service_rfds[n]))
198                    FD_CLR(fd, &service_rfds[n]);
199        }
200}
201
202int main(int argc, char *argv[])
203{
204  char server_command[MAX_SERVICES][1000];
205  int nservices = 0;
206  int command_argc[MAX_SERVICES];
207  char **command_argv[MAX_SERVICES];
208  int val;
209  int listen_fd[MAX_SERVICES];
210  int status;
211  struct sockaddr_in listen_addr;
212  struct sockaddr_in recv_addr;
213  int listen_port[MAX_SERVICES];
214  int recv_port = -1;
215  int connected_fds[10] = {0};
216  int subnet_addr;
217  int n;
218
219  listen_port[0] = -1;
220  server_command[0][0] = 0;
221
222  while(1) {
223    int c;
224    int this_option_optind = optind ? optind : 1;
225    int option_index = 0;
226    struct option long_options[] = {
227      // name, has_arg, flag, val
228      { 0,0,0,0 },
229    };
230
231    c = getopt_long(argc, argv, "+b:c:l:s:", long_options, &option_index);
232    if (c == -1)
233      break;
234
235    switch(c) {
236      case 'b':
237        recv_port = strtoul(optarg,0,0);
238        break;
239      case 'c':
240        strncpy(server_command[nservices], optarg, sizeof(server_command[0]));
241
242                if (listen_port[nservices] == -1) {
243                    fprintf(stderr,"Must specify -l port before each -c command.\n");
244                        return 1;
245                }
246
247                nservices++;
248                listen_port[nservices] = -1;
249        break;
250      case 'l':
251        listen_port[nservices] = strtoul(optarg,0,0);
252        break;
253      case 's':
254        send_addr.sin_addr.s_addr = htonl(inet_network(optarg,
255                                                       &send_addr.sin_addr));
256        if (send_addr.sin_addr.s_addr == -1) {
257          fprintf(stderr,"Invalid subnet broadcast address");
258          return 1;
259        }
260        break;
261      default:
262        fprintf(stderr,"Don't know what option '%c'.\n", c);
263        return 1;
264    }
265  }
266
267  if (nservices == 0 ||
268      recv_port == -1 ||
269      subnet_addr == -1 ||
270      server_command[0][0]=='\0') {
271    help(argv[0]);
272    return 1;
273  }
274
275  for(n = 0; n < nservices; n++) {
276      // Parse the command arguments...
277
278      command_argc[n]=0;
279      command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *));
280      command_argv[n][command_argc[n]] = strtok(server_command[n], " \t");
281      command_argc[n]++;
282      while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) {
283        command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *));
284        command_argc[n]++;
285      }
286
287      // Create a socket for listening.
288      listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
289      if (listen_fd[n] < 0) {
290        perror("socket");
291        exit(1);
292      }
293 
294      // If program is killed, drop the socket address reservation immediately.
295          val = 1;
296      status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
297      if (status < 0) {
298        perror("setsockopt");
299        // Not fatal.  Keep on going.
300      }
301
302      // Bind this address to the socket.
303      listen_addr.sin_family = AF_INET;
304      listen_addr.sin_port = htons(listen_port[n]);
305      listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
306      status = bind(listen_fd[n], (struct sockaddr *)&listen_addr,
307                sizeof(listen_addr));
308      if (status < 0) {
309        perror("bind");
310        exit(1);
311      }
312
313      // Listen on the specified port.
314      status = listen(listen_fd[n],5);
315      if (status < 0) {
316        perror("listen");
317      }
318  }
319
320  // Create a socket for broadcast.
321  send_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
322  if (send_fd < 0) {
323    perror("socket");
324    exit(1);
325  }
326
327  // If program is killed, drop the socket address reservation immediately.
328  val = 1;
329  status = setsockopt(send_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
330  if (status < 0) {
331    perror("setsockopt");
332    // Not fatal.  Keep on going.
333  }
334
335  // We're going to broadcast through this socket.
336  val = 1;
337  status = setsockopt(send_fd, SOL_SOCKET, SO_BROADCAST, &val, sizeof(val));
338  if (status < 0) {
339    perror("setsockopt");
340    // Not fatal.  Keep on going.
341  }
342
343  // Bind this address to the socket.
344  recv_addr.sin_family = AF_INET;
345  recv_addr.sin_port = htons(recv_port);
346  recv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
347  status = bind(send_fd, (struct sockaddr *)&recv_addr,
348                sizeof(recv_addr));
349  if (status < 0) {
350    perror("bind");
351    exit(1);
352  }
353
354  // Set up the address that we broadcast to.
355  send_addr.sin_family = AF_INET;
356  send_addr.sin_port = htons(recv_port);
357
358  // Set up a signal handler for the alarm interrupt.
359  // It doesn't do anything other than interrupt select() below.
360  if (signal(SIGALRM,sigalarm_handler) == SIG_ERR) {
361    perror("signal SIGALRM");
362  }
363
364  struct itimerval itvalue = {
365    {1, 0}, {1, 0}
366  };
367  status = setitimer(ITIMER_REAL, &itvalue, NULL);
368  if (status != 0) {
369    perror("setitimer");
370  }
371
372  // We're ready to go.  Before going into the main loop,
373  // broadcast a load announcement to other machines.
374  broadcast_load();
375
376  int maxfd = send_fd;
377  FD_ZERO(&saved_rfds);
378  FD_ZERO(&pipe_rfds);
379
380  for(n = 0; n < nservices; n++) {
381      FD_ZERO(&service_rfds[n]);
382      FD_SET(listen_fd[n], &saved_rfds);
383          if (listen_fd[n] > maxfd)
384              maxfd = listen_fd[n];
385  }
386
387  FD_SET(send_fd, &saved_rfds);
388  while(1) {
389
390    fd_set rfds = saved_rfds;
391
392    status = select(maxfd+1, &rfds, NULL, NULL, 0);
393    if (status <= 0) {
394      if (sigalarm_set) {
395        update_load_average();
396        sigalarm_set = 0;
397      }
398      continue;
399    }
400
401   
402    int accepted = 0;
403    for(n = 0; n < nservices; n++) {
404        if (FD_ISSET(listen_fd[n], &rfds)) {
405          // Accept a new connection.
406          struct sockaddr_in newaddr;
407          unsigned int addrlen = sizeof(newaddr);
408          int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen);
409          if (newfd < 0) {
410            perror("accept");
411            continue;
412          }
413
414          printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr));
415          FD_SET(newfd, &saved_rfds);
416          maxfd = max(maxfd, newfd);
417                  FD_SET(newfd, &service_rfds[n]);
418          accepted = 1;
419                }
420        }
421
422        if (accepted)
423            continue;
424
425    if (FD_ISSET(send_fd, &rfds)) {
426      int buffer[1000];
427      struct sockaddr_in peer_addr;
428      unsigned int len = sizeof(peer_addr);
429      status = recvfrom(send_fd, buffer, sizeof(buffer), 0,
430                        (struct sockaddr*)&peer_addr, &len);
431      if (status < 0) {
432        perror("recvfrom");
433        continue;
434      }
435      if (status != 8) {
436        fprintf(stderr,"Bogus message from %s\n",
437                inet_ntoa(peer_addr.sin_addr));
438        continue;
439      }
440      float peer_load = ntohl(buffer[0]);
441      int peer_procs = ntohl(buffer[1]);
442      //printf("Load for %s is %f (%d processes).\n",
443      //       inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
444      int h;
445      int free_index=-1;
446      int found = 0;
447      for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
448        if (host_array[h].in_addr.s_addr == peer_addr.sin_addr.s_addr) {
449          if (host_array[h].children != peer_procs) {
450            printf("Load for %s is %f (%d processes).\n",
451                   inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
452          }
453          host_array[h].load = peer_load;
454          host_array[h].children = peer_procs;
455          found = 1;
456          break;
457        }
458        if (host_array[h].in_addr.s_addr == 0 && free_index == -1) {
459          free_index = h;
460        }
461      }
462      if (!found) {
463        host_array[free_index].in_addr.s_addr = peer_addr.sin_addr.s_addr;
464        host_array[free_index].load = peer_load;
465      }
466      continue;
467    }
468
469    int i;
470    for(i=0; i<maxfd+1; i++) {
471      if (FD_ISSET(i,&rfds)) {
472
473        // If this is a pipe, get the load.  Update.
474        if (FD_ISSET(i,&pipe_rfds)) {
475          float value;
476          status = read(i, &value, sizeof(value));
477          if (status != 4) {
478                    //fprintf(stderr,"error reading pipe, child ended?\n");
479            close_child(i);
480                        /*close(i);
481            FD_CLR(i, &saved_rfds);
482            FD_CLR(i, &pipe_rfds); */
483          } else {
484            note_request(i,value);
485          }
486          continue;
487        }
488
489        // This must be a descriptor that we're waiting to from
490        // for the memory footprint.  Get it.
491        int msg;
492        status = read(i, &msg, 4);
493        if (status != 4) {
494          fprintf(stderr,"Bad status on read (%d).", status);
495          FD_CLR(i, &saved_rfds);
496          clear_service_fd(i);
497                  close(i);
498          continue;
499        }
500
501        // find the new memory increment
502        int newmemory = ntohl(msg);
503
504        // Find the best host to create a new child on.
505        int index = find_best_host();
506
507        // Only redirect if another host's load is significantly less
508        // than our own...
509        if (index != -1 &&
510            (host_array[index].load < (LOAD_REDIRECT_FACTOR * load))) {
511
512          // If we're redirecting to another machine, give that machine
513          // an extra boost in our copy of the load statistics.  This will
514          // keep us from sending the very next job to it.  Eventually, the
515          // other machine will broadcast its real load and we can make an
516          // informed decision as to who redirect to again.
517          host_array[index].load += newmemory * INITIAL_LOAD;
518
519          // Redirect to another machine.
520          printf("Redirecting to %s\n",
521                 inet_ntoa(host_array[index].in_addr));
522          write(i, &host_array[index].in_addr.s_addr, 4);
523          FD_CLR(i, &saved_rfds);
524          clear_service_fd(i);
525          close(i);
526          continue;
527        }
528
529        memory_in_use += newmemory;
530        load += 2*INITIAL_LOAD;
531        broadcast_load();
532        printf("Accepted new job with memory %d\n", newmemory);
533        //printf("My load is now %f\n", load);
534
535        // accept the connection.
536        msg = 0;
537        write(i, &msg, 4);
538
539        int pair[2];
540        status = pipe(pair);
541        if (status != 0) {
542          perror("pipe");
543        }
544
545        // Make the child side of the pipe non-blocking...
546        status = fcntl(pair[1], F_SETFL, O_NONBLOCK);
547        if (status < 0) {
548          perror("fcntl");
549        }
550
551        // Fork the new process.  Connect i/o to the new socket.
552        status = fork();
553        if (status < 0) {
554          perror("fork");
555        } else if (status == 0) {
556
557                  for(n = 0; n < MAX_SERVICES; n++) {
558                    if (FD_ISSET(i, &service_rfds[n])) {
559
560                          // disassociate
561                          if ( daemon(0,1) == 0 ) {
562                int fd;
563
564                dup2(i,0);  // stdin
565                dup2(i,1);  // stdout
566                dup2(i,2);  // stderr
567                dup2(pair[1],3);
568
569                for(fd=4; fd<FD_SETSIZE; fd++)
570                  close(fd);
571
572                execvp(command_argv[n][0], command_argv[n]);
573                      }
574                          _exit(errno);
575                    }
576                  }
577                  _exit(EINVAL);
578
579        } else {
580          int c;
581                  // reap initial child which will exit immediately (grandchild continues)
582                  waitpid(status, NULL, 0);
583          for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) {
584            if (child_array[c].pipefd == 0) {
585              child_array[c].memory = newmemory;
586              child_array[c].pipefd = pair[0];
587              child_array[c].requests = INITIAL_LOAD;
588              status = close(pair[1]);
589              if (status != 0) {
590                perror("close pair[1]");
591              }
592              FD_SET(pair[0], &saved_rfds);
593              FD_SET(pair[0], &pipe_rfds);
594              maxfd = max(pair[0], maxfd);
595              break;
596            }
597          }
598
599          children++;
600          broadcast_load();
601        }
602
603
604        FD_CLR(i, &saved_rfds);
605        clear_service_fd(i);
606        close(i);
607        break;
608      }
609
610    } // for all connected_fds
611
612  } // while(1)
613
614}
615
Note: See TracBrowser for help on using the repository browser.