source: trunk/gui/vizservers/nanoscale/server.c @ 409

Last change on this file since 409 was 409, checked in by kennell, 18 years ago

Added nanoscale for visualization server load distribution.

File size: 15.0 KB
Line 
1#include <stdio.h>
2#include <unistd.h>
3#include <stdlib.h>
4#include <string.h>
5#include <signal.h>
6#include <sys/types.h>
7#include <sys/socket.h>
8#include <sys/wait.h>
9#include <sys/select.h>
10#include <sys/time.h>
11#include <fcntl.h>
12#include <netinet/in.h>
13#include <getopt.h>
14
15// The initial request load for a new renderer.
16#define INITIAL_LOAD 100000000.0
17
18// The factor that the load is divided by every second.
19#define LOAD_DROP_OFF 2.0
20
21// The broadcast interval (in seconds)
22#define BROADCAST_INTERVAL 5
23
24// The load of a remote machine must be less than this factor to
25// justify redirection.
26#define LOAD_REDIRECT_FACTOR 0.8
27
28float load = 0;             // The present load average for this system.
29int memory_in_use = 0;      // Total memory in use by this system.
30int children = 0;           // Number of children running on this system.
31int send_fd;                // The file descriptor we broadcast through.
32struct sockaddr_in send_addr;  // The subnet address we broadcast to.
33fd_set saved_rfds;          // Descriptors we're reading from.
34fd_set pipe_rfds;           // Descriptors that are pipes to children.
35
36
37struct host_info {
38  struct in_addr in_addr;
39  float          load;
40  int            children;
41};
42
43struct child_info {
44  pid_t pid;
45  int   memory;
46  int   pipefd;
47  float requests;
48};
49
50struct host_info host_array[100];
51struct child_info child_array[100];
52
53
54
55
56/*
57 * min()/max() macros that also do
58 * strict type-checking.. See the
59 * "unnecessary" pointer comparison.
60 */
61#define min(x,y) ({ \
62        typeof(x) _x = (x);     \
63        typeof(y) _y = (y);     \
64        (void) (&_x == &_y);            \
65        _x < _y ? _x : _y; })
66
67#define max(x,y) ({ \
68        typeof(x) _x = (x);     \
69        typeof(y) _y = (y);     \
70        (void) (&_x == &_y);            \
71        _x > _y ? _x : _y; })
72
73int find_best_host(void)
74{
75  int h;
76  float best = load;
77  int index = -1;
78  //printf("My load is %f\n", best);
79  for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
80    if (host_array[h].in_addr.s_addr == 0)
81      continue;
82    //printf("%d I think load for %s is %f   ", h,
83    //       inet_ntoa(host_array[h].in_addr), host_array[h].load);
84    if (host_array[h].children <= children) {
85      if (host_array[h].load < best) {
86        //if ((random() % 100) < 75) {
87          index = h;
88          best = host_array[h].load;
89        //}
90        //printf(" Better\n");
91      } else {
92        //printf(" Worse\n");
93      }
94    }
95  }
96
97  //printf("I choose %d\n", index);
98  return index;
99}
100
101
102void broadcast_load(void)
103{
104  int msg[2];
105  msg[0] = htonl(load);
106  msg[1] = htonl(children);
107  int status;
108  status = sendto(send_fd, &msg, sizeof(msg), 0, (struct sockaddr *)&send_addr,
109                  sizeof(send_addr));
110  if (status < 0) {
111    perror("sendto");
112  }
113}
114
115void check_children(void)
116{
117  int status;
118
119  // Collect any children that have exited.
120  do {
121    int return_status;
122    status = waitpid(0, &return_status, WNOHANG);
123    if (status > 0) {
124      children--;
125    }
126    int i;
127    for(i=0; i<sizeof(child_array)/sizeof(child_array[0]); i++) {
128      if (child_array[i].pid == status) {
129        memory_in_use -= child_array[i].memory;
130        child_array[i].pid = 0;
131        child_array[i].memory = 0;
132        status = close(child_array[i].pipefd);
133        FD_CLR(child_array[i].pipefd, &saved_rfds);
134        FD_CLR(child_array[i].pipefd, &pipe_rfds);
135        break;
136      }
137    }
138  } while(status > 0);
139  printf("processes=%d, memory=%d, load=%f\n",
140         children, memory_in_use, load);
141
142  broadcast_load();
143}
144
145void note_request(int fd, float value)
146{
147  int c;
148  for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
149    if (child_array[c].pipefd == fd) {
150      child_array[c].requests += value;
151#ifdef DEBUGGING
152      printf("Updating requests for pid %d to %f\n",
153             child_array[c].pid,
154             child_array[c].requests);
155#endif
156      return;
157    }
158  }
159}
160
161void update_load_average(void)
162{
163  static unsigned int counter;
164
165  load = load / LOAD_DROP_OFF;
166  float newload = 0.0;
167  int c;
168  for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
169    if (child_array[c].pid != 0) {
170      newload += child_array[c].requests * child_array[c].memory;
171      child_array[c].requests = 0;
172    }
173  }
174  load = load + newload;
175
176  if ((counter++ % BROADCAST_INTERVAL) == 0) {
177    broadcast_load();
178  }
179}
180
181volatile int sigalarm_set;
182volatile int sigchild_set;
183void sigalarm_handler(int signum)
184{
185  sigalarm_set = 1;
186}
187void sigchild_handler(int signum)
188{
189  sigchild_set = 1;
190}
191
192void help(const char *argv0)
193{
194  fprintf(stderr,
195          "Syntax: %s -b <broadcast port> -l <listen port> -s <subnet> -c 'command'\n",
196          argv0);
197  exit(1);
198}
199
200int main(int argc, char *argv[])
201{
202  char server_command[1000];
203  int command_argc;
204  char **command_argv;
205  int val;
206  int listen_fd = -1;
207  int status;
208  struct sockaddr_in listen_addr;
209  struct sockaddr_in recv_addr;
210  int listen_port = -1;
211  int recv_port = -1;
212  int connected_fds[10] = {0};
213  int subnet_addr;
214
215  while(1) {
216    int c;
217    int this_option_optind = optind ? optind : 1;
218    int option_index = 0;
219    struct option long_options[] = {
220      // name, has_arg, flag, val
221      { 0,0,0,0 },
222    };
223
224    c = getopt_long(argc, argv, "+b:c:l:s:", long_options, &option_index);
225    if (c == -1)
226      break;
227
228    switch(c) {
229      case 'b':
230        recv_port = strtoul(optarg,0,0);
231        break;
232      case 'c':
233        strncpy(server_command, optarg, sizeof(server_command));
234        break;
235      case 'l':
236        listen_port = strtoul(optarg,0,0);
237        break;
238      case 's':
239        send_addr.sin_addr.s_addr = htonl(inet_network(optarg,
240                                                       &send_addr.sin_addr));
241        if (send_addr.sin_addr.s_addr == -1) {
242          fprintf(stderr,"Invalid subnet broadcast address");
243          return 1;
244        }
245        break;
246      default:
247        fprintf(stderr,"Don't know what option '%c'.\n", c);
248        return 1;
249    }
250  }
251
252  if (listen_port == -1 ||
253      recv_port == -1 ||
254      subnet_addr == -1 ||
255      server_command[0]=='\0') {
256    help(argv[0]);
257    return 1;
258  }
259
260  // Parse the command arguments...
261  command_argc=0;
262  command_argv = malloc((command_argc+2) * sizeof(char *));
263  command_argv[command_argc] = strtok(server_command, " \t");
264  command_argc++;
265  while( (command_argv[command_argc] = strtok(NULL, " \t"))) {
266    command_argv = realloc(command_argv, (command_argc+2) * sizeof(char *));
267    command_argc++;
268  }
269
270  // Create a socket for listening.
271  listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
272  if (listen_fd < 0) {
273    perror("socket");
274    exit(1);
275  }
276
277  // If program is killed, drop the socket address reservation immediately.
278  val = 1;
279  status = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
280  if (status < 0) {
281    perror("setsockopt");
282    // Not fatal.  Keep on going.
283  }
284
285  // Bind this address to the socket.
286  listen_addr.sin_family = AF_INET;
287  listen_addr.sin_port = htons(listen_port);
288  listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
289  status = bind(listen_fd, (struct sockaddr *)&listen_addr,
290                sizeof(listen_addr));
291  if (status < 0) {
292    perror("bind");
293    exit(1);
294  }
295
296  // Listen on the specified port.
297  status = listen(listen_fd,5);
298  if (status < 0) {
299    perror("listen");
300  }
301
302  // Create a socket for broadcast.
303  send_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
304  if (send_fd < 0) {
305    perror("socket");
306    exit(1);
307  }
308
309  // If program is killed, drop the socket address reservation immediately.
310  val = 1;
311  status = setsockopt(send_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
312  if (status < 0) {
313    perror("setsockopt");
314    // Not fatal.  Keep on going.
315  }
316
317  // We're going to broadcast through this socket.
318  val = 1;
319  status = setsockopt(send_fd, SOL_SOCKET, SO_BROADCAST, &val, sizeof(val));
320  if (status < 0) {
321    perror("setsockopt");
322    // Not fatal.  Keep on going.
323  }
324
325  // Bind this address to the socket.
326  recv_addr.sin_family = AF_INET;
327  recv_addr.sin_port = htons(recv_port);
328  recv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
329  status = bind(send_fd, (struct sockaddr *)&recv_addr,
330                sizeof(recv_addr));
331  if (status < 0) {
332    perror("bind");
333    exit(1);
334  }
335
336  // Set up the address that we broadcast to.
337  send_addr.sin_family = AF_INET;
338  send_addr.sin_port = htons(recv_port);
339
340  // Set up a signal handler for exiting children.
341  // It doesn't do anything other than interrupt select() below.
342  if (signal(SIGCHLD,sigchild_handler) == SIG_ERR) {
343    perror("signal SIGCHLD");
344  }
345
346  // Set up a signal handler for the alarm interrupt.
347  // It doesn't do anything other than interrupt select() below.
348  if (signal(SIGALRM,sigalarm_handler) == SIG_ERR) {
349    perror("signal SIGCHLD");
350  }
351
352  struct itimerval itvalue = {
353    {1, 0}, {1, 0}
354  };
355  status = setitimer(ITIMER_REAL, &itvalue, NULL);
356  if (status != 0) {
357    perror("setitimer");
358  }
359
360  // We're ready to go.  Before going into the main loop,
361  // do a check_children to broadcast a load announcement to
362  // other machines.
363  check_children();
364
365
366  int maxfd = max(listen_fd,send_fd);
367  FD_ZERO(&saved_rfds);
368  FD_ZERO(&pipe_rfds);
369  FD_SET(listen_fd, &saved_rfds);
370  FD_SET(send_fd, &saved_rfds);
371  while(1) {
372
373    fd_set rfds = saved_rfds;
374
375    status = select(maxfd+1, &rfds, NULL, NULL, 0);
376    if (status <= 0) {
377      if (sigalarm_set) {
378        update_load_average();
379        sigalarm_set = 0;
380      }
381      if (sigchild_set) {
382        check_children();
383        sigchild_set = 0;
384      }
385      continue;
386    }
387
388    if (FD_ISSET(listen_fd, &rfds)) {
389      // Accept a new connection.
390      struct sockaddr_in newaddr;
391      unsigned int addrlen = sizeof(newaddr);
392      int newfd = accept(listen_fd, (struct sockaddr *)&newaddr, &addrlen);
393      if (newfd < 0) {
394        perror("accept");
395        continue;
396      }
397
398      printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr));
399      FD_SET(newfd, &saved_rfds);
400      maxfd = max(maxfd, newfd);
401      continue;
402    }
403
404    if (FD_ISSET(send_fd, &rfds)) {
405      int buffer[1000];
406      struct sockaddr_in peer_addr;
407      unsigned int len = sizeof(peer_addr);
408      status = recvfrom(send_fd, buffer, sizeof(buffer), 0,
409                        (struct sockaddr*)&peer_addr, &len);
410      if (status < 0) {
411        perror("recvfrom");
412        continue;
413      }
414      if (status != 8) {
415        fprintf(stderr,"Bogus message from %s\n",
416                inet_ntoa(peer_addr.sin_addr));
417        continue;
418      }
419      float peer_load = ntohl(buffer[0]);
420      int peer_procs = ntohl(buffer[1]);
421      //printf("Load for %s is %f (%d processes).\n",
422      //       inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
423      int h;
424      int free_index=-1;
425      int found = 0;
426      for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
427        if (host_array[h].in_addr.s_addr == peer_addr.sin_addr.s_addr) {
428          if (host_array[h].children != peer_procs) {
429            printf("Load for %s is %f (%d processes).\n",
430                   inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
431          }
432          host_array[h].load = peer_load;
433          host_array[h].children = peer_procs;
434          found = 1;
435          break;
436        }
437        if (host_array[h].in_addr.s_addr == 0 && free_index == -1) {
438          free_index = h;
439        }
440      }
441      if (!found) {
442        host_array[free_index].in_addr.s_addr = peer_addr.sin_addr.s_addr;
443        host_array[free_index].load = peer_load;
444      }
445      continue;
446    }
447
448    int i;
449    for(i=0; i<maxfd+1; i++) {
450      if (FD_ISSET(i,&rfds)) {
451
452        // If this is a pipe, get the load.  Update.
453        if (FD_ISSET(i,&pipe_rfds)) {
454          float value;
455          status = read(i, &value, sizeof(value));
456          if (status != 4) {
457            close(i);
458            FD_CLR(i, &saved_rfds);
459            FD_CLR(i, &pipe_rfds);
460          } else {
461            note_request(i,value);
462          }
463          continue;
464        }
465
466        // This must be a descriptor that we're waiting to from
467        // for the memory footprint.  Get it.
468        int msg;
469        status = read(i, &msg, 4);
470        if (status != 4) {
471          fprintf(stderr,"Bad status on read (%d).", status);
472          FD_CLR(i, &saved_rfds);
473          close(i);
474          continue;
475        }
476
477        // find the new memory increment
478        int newmemory = ntohl(msg);
479
480        // Find the best host to create a new child on.
481        int index = find_best_host();
482
483        // Only redirect if another host's load is significantly less
484        // than our own...
485        if (index != -1 &&
486            (host_array[index].load < (LOAD_REDIRECT_FACTOR * load))) {
487
488          // If we're redirecting to another machine, give that machine
489          // an extra boost in our copy of the load statistics.  This will
490          // keep us from sending the very next job to it.  Eventually, the
491          // other machine will broadcast its real load and we can make an
492          // informed decision as to who redirect to again.
493          host_array[index].load += newmemory * INITIAL_LOAD;
494
495          // Redirect to another machine.
496          printf("Redirecting to %s\n",
497                 inet_ntoa(host_array[index].in_addr));
498          write(i, &host_array[index].in_addr.s_addr, 4);
499          FD_CLR(i, &saved_rfds);
500          close(i);
501          continue;
502        }
503
504        memory_in_use += newmemory;
505        load += 2*INITIAL_LOAD;
506        broadcast_load();
507        printf("Accepted new job with memory %d\n", newmemory);
508        //printf("My load is now %f\n", load);
509
510        // accept the connection.
511        msg = 0;
512        write(i, &msg, 4);
513
514        int pair[2];
515        status = pipe(pair);
516        if (status != 0) {
517          perror("pipe");
518        }
519
520        // Make the child side of the pipe non-blocking...
521        status = fcntl(pair[1], F_SETFL, O_NONBLOCK);
522        if (status < 0) {
523          perror("fcntl");
524        }
525
526        // Fork the new process.  Connect i/o to the new socket.
527        status = fork();
528        if (status < 0) {
529          perror("fork");
530        } else if (status == 0) {
531          dup2(i,0);  // stdin
532          dup2(i,1);  // stdout
533          dup2(i,2);  // stderr
534          dup2(pair[1],3);
535          int fd;
536          for(fd=4; fd<FD_SETSIZE; fd++) {
537            close(fd);
538          }
539          execvp(command_argv[0], command_argv);
540        } else {
541          int c;
542          for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) {
543            if (child_array[c].pid == 0) {
544              child_array[c].pid = status;
545              child_array[c].memory = newmemory;
546              child_array[c].pipefd = pair[0];
547              child_array[c].requests = INITIAL_LOAD;
548              status = close(pair[1]);
549              if (status != 0) {
550                perror("close pair[1]");
551              }
552              FD_SET(pair[0], &saved_rfds);
553              FD_SET(pair[0], &pipe_rfds);
554              maxfd = max(pair[0], maxfd);
555              break;
556            }
557          }
558
559          children++;
560          check_children();
561        }
562
563
564        FD_CLR(i, &saved_rfds);
565        close(i);
566        break;
567      }
568
569    } // for all connected_fds
570
571  } // while(1)
572
573}
574
Note: See TracBrowser for help on using the repository browser.