source: trunk/packages/vizservers/nanoscale/server.c @ 1970

Last change on this file since 1970 was 1380, checked in by gah, 13 years ago
File size: 16.1 KB
Line 
1#include <stdio.h>
2#include <unistd.h>
3#include <stdlib.h>
4#include <string.h>
5#include <signal.h>
6#include <sys/types.h>
7#include <sys/socket.h>
8#include <sys/wait.h>
9#include <sys/select.h>
10#include <sys/time.h>
11#include <arpa/inet.h>
12#include <fcntl.h>
13#include <netinet/in.h>
14#include <getopt.h>
15#include <errno.h>
16
17// The initial request load for a new renderer.
18#define INITIAL_LOAD 100000000.0
19
20// The factor that the load is divided by every second.
21#define LOAD_DROP_OFF 2.0
22
23// The broadcast interval (in seconds)
24#define BROADCAST_INTERVAL 5
25
26// The load of a remote machine must be less than this factor to
27// justify redirection.
28#define LOAD_REDIRECT_FACTOR 0.8
29
30// Maxium number of services we support
31#define MAX_SERVICES 100
32
33float load = 0;             // The present load average for this system.
34int memory_in_use = 0;      // Total memory in use by this system.
35int children = 0;           // Number of children running on this system.
36int send_fd;                // The file descriptor we broadcast through.
37struct sockaddr_in send_addr;  // The subnet address we broadcast to.
38fd_set saved_rfds;          // Descriptors we're reading from.
39fd_set pipe_rfds;           // Descriptors that are pipes to children.
40fd_set service_rfds[MAX_SERVICES];
41int maxCards = 1;
42int dispNum = -1;
43char displayVar[200];
44
45struct host_info {
46    struct in_addr in_addr;
47    float          load;
48    int            children;
49};
50
51struct child_info {
52    int   memory;
53    int   pipefd;
54    float requests;
55};
56
57struct host_info host_array[100];
58struct child_info child_array[100];
59
60/*
61 * min()/max() macros that also do
62 * strict type-checking.. See the
63 * "unnecessary" pointer comparison.
64 */
65#define min(x,y) ({                             \
66            typeof(x) _x = (x);                 \
67            typeof(y) _y = (y);                 \
68            (void) (&_x == &_y);                \
69            _x < _y ? _x : _y; })
70
71#define max(x,y) ({                             \
72            typeof(x) _x = (x);                 \
73            typeof(y) _y = (y);                 \
74            (void) (&_x == &_y);                \
75            _x > _y ? _x : _y; })
76
77#ifdef notdef
78static int
79find_best_host(void)
80{
81    int h;
82    float best = load;
83    int index = -1;
84    //printf("My load is %f\n", best);
85    for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
86        if (host_array[h].in_addr.s_addr == 0)
87            continue;
88        //printf("%d I think load for %s is %f   ", h,
89        //       inet_ntoa(host_array[h].in_addr), host_array[h].load);
90        if (host_array[h].children <= children) {
91            if (host_array[h].load < best) {
92                //if ((random() % 100) < 75) {
93                index = h;
94                best = host_array[h].load;
95                //}
96                //printf(" Better\n");
97            } else {
98                //printf(" Worse\n");
99            }
100        }
101    }
102
103    //printf("I choose %d\n", index);
104    return index;
105}
106#endif
107
108static void
109broadcast_load(void)
110{
111    int msg[2];
112    msg[0] = htonl(load);
113    msg[1] = htonl(children);
114    int status;
115    status = sendto(send_fd, &msg, sizeof(msg), 0, (struct sockaddr *)&send_addr,
116                    sizeof(send_addr));
117    if (status < 0) {
118        perror("sendto");
119    }
120}
121
122static void
123close_child(int pipe_fd)
124{
125    int i;
126    for(i=0; i<sizeof(child_array)/sizeof(child_array[0]); i++) {
127        if (child_array[i].pipefd == pipe_fd) {
128            children--;
129            memory_in_use -= child_array[i].memory;
130            child_array[i].memory = 0;
131            FD_CLR(child_array[i].pipefd, &saved_rfds);
132            FD_CLR(child_array[i].pipefd, &pipe_rfds);
133            close(child_array[i].pipefd);
134            child_array[i].pipefd = 0;
135            break;
136        }
137    }
138 
139    printf("processes=%d, memory=%d, load=%f\n",
140           children, memory_in_use, load);
141
142#ifdef notdef
143    broadcast_load();
144#endif
145}
146
147void note_request(int fd, float value)
148{
149    int c;
150    for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
151        if (child_array[c].pipefd == fd) {
152            child_array[c].requests += value;
153#ifdef DEBUGGING
154            printf("Updating requests from pipefd %d to %f\n",
155                   child_array[c].pipefd,
156                   child_array[c].requests);
157#endif
158            return;
159        }
160    }
161}
162
163static void
164update_load_average(void)
165{
166    static unsigned int counter;
167
168    load = load / LOAD_DROP_OFF;
169    float newload = 0.0;
170    int c;
171    for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
172        if (child_array[c].pipefd != 0) {
173            newload += child_array[c].requests * child_array[c].memory;
174            child_array[c].requests = 0;
175        }
176    }
177    load = load + newload;
178
179    if ((counter++ % BROADCAST_INTERVAL) == 0) {
180        broadcast_load();
181    }
182}
183
184volatile int sigalarm_set;
185
186static void
187sigalarm_handler(int signum)
188{
189    sigalarm_set = 1;
190}
191
192static void
193help(const char *argv0)
194{
195    fprintf(stderr,
196            "Syntax: %s [-d] -b <broadcast port> -l <listen port> -s <subnet> -c 'command'\n",
197            argv0);
198    exit(1);
199}
200
201static void
202clear_service_fd(int fd)
203{
204    int n;
205
206    for(n = 0; n < MAX_SERVICES; n++) {
207        if (FD_ISSET(fd, &service_rfds[n]))
208            FD_CLR(fd, &service_rfds[n]);
209    }
210}
211
212int
213main(int argc, char *argv[])
214{
215    char server_command[MAX_SERVICES][1000];
216    int nservices = 0;
217    int command_argc[MAX_SERVICES];
218    char **command_argv[MAX_SERVICES];
219    int val;
220    int listen_fd[MAX_SERVICES];
221    int status;
222    struct sockaddr_in listen_addr;
223    struct sockaddr_in recv_addr;
224    int listen_port[MAX_SERVICES];
225    int recv_port = -1;
226    int debug_flag = 0;
227    int n;
228
229    listen_port[0] = -1;
230    server_command[0][0] = 0;
231
232    strcpy(displayVar, "DISPLAY=:0.0");
233    if (putenv(displayVar) < 0) {
234        perror("putenv");
235    }
236    while(1) {
237        int c;
238        int option_index = 0;
239        struct option long_options[] = {
240            // name, has_arg, flag, val
241            { 0,0,0,0 },
242        };
243
244        c = getopt_long(argc, argv, "+b:c:l:s:x:d", long_options,
245                        &option_index);
246        if (c == -1)
247            break;
248
249        switch(c) {
250        case 'x': /* Number of video cards */
251            maxCards = strtoul(optarg, 0, 0);
252            if ((maxCards < 1) || (maxCards > 10)) {
253                fprintf(stderr, "bad number of max videocards specified\n");
254                return 1;
255            }
256            break;
257        case 'd':
258            debug_flag = 1;
259            break;
260        case 'b':
261            recv_port = strtoul(optarg, 0, 0);
262            break;
263        case 'c':
264            strncpy(server_command[nservices], optarg, sizeof(server_command[0]));
265
266            if (listen_port[nservices] == -1) {
267                fprintf(stderr,"Must specify -l port before each -c command.\n");
268                return 1;
269            }
270
271            nservices++;
272            listen_port[nservices] = -1;
273            break;
274        case 'l':
275            listen_port[nservices] = strtoul(optarg,0,0);
276            break;
277        case 's':
278            send_addr.sin_addr.s_addr = htonl(inet_network(optarg));
279            if (send_addr.sin_addr.s_addr == -1) {
280                fprintf(stderr,"Invalid subnet broadcast address");
281                return 1;
282            }
283            break;
284        default:
285            fprintf(stderr,"Don't know what option '%c'.\n", c);
286            return 1;
287        }
288    }
289    if (nservices == 0 ||
290        recv_port == -1 ||
291        server_command[0][0]=='\0') {
292        int i;
293        fprintf(stderr, "nservices=%d, recv_port=%d, server_command[0]=%s\n", nservices, recv_port, server_command[0]);
294        for (i = 0; i < argc; i++) {
295            fprintf(stderr, "argv[%d]=(%s)\n", i, argv[i]);
296        }
297        help(argv[0]);
298        return 1;
299    }
300
301    for(n = 0; n < nservices; n++) {
302        // Parse the command arguments...
303
304        command_argc[n]=0;
305        command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *));
306        command_argv[n][command_argc[n]] = strtok(server_command[n], " \t");
307        command_argc[n]++;
308        while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) {
309            command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *));
310            command_argc[n]++;
311        }
312
313        // Create a socket for listening.
314        listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
315        if (listen_fd[n] < 0) {
316            perror("socket");
317            exit(1);
318        }
319 
320        // If program is killed, drop the socket address reservation immediately.
321        val = 1;
322        status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val,
323                            sizeof(val));
324        if (status < 0) {
325            perror("setsockopt");
326            // Not fatal.  Keep on going.
327        }
328
329        // Bind this address to the socket.
330        listen_addr.sin_family = AF_INET;
331        listen_addr.sin_port = htons(listen_port[n]);
332        listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
333        status = bind(listen_fd[n], (struct sockaddr *)&listen_addr,
334                      sizeof(listen_addr));
335        if (status < 0) {
336            perror("bind");
337            exit(1);
338        }
339
340        // Listen on the specified port.
341        status = listen(listen_fd[n],5);
342        if (status < 0) {
343            perror("listen");
344        }
345    }
346
347    // Create a socket for broadcast.
348    send_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
349    if (send_fd < 0) {
350        perror("socket");
351        exit(1);
352    }
353
354    // If program is killed, drop the socket address reservation immediately.
355    val = 1;
356    status = setsockopt(send_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
357    if (status < 0) {
358        perror("setsockopt");
359        // Not fatal.  Keep on going.
360    }
361
362    // We're going to broadcast through this socket.
363    val = 1;
364    status = setsockopt(send_fd, SOL_SOCKET, SO_BROADCAST, &val, sizeof(val));
365    if (status < 0) {
366        perror("setsockopt");
367        // Not fatal.  Keep on going.
368    }
369
370    // Bind this address to the socket.
371    recv_addr.sin_family = AF_INET;
372    recv_addr.sin_port = htons(recv_port);
373    recv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
374    status = bind(send_fd, (struct sockaddr *)&recv_addr,
375                  sizeof(recv_addr));
376    if (status < 0) {
377        perror("bind");
378        exit(1);
379    }
380
381    // Set up the address that we broadcast to.
382    send_addr.sin_family = AF_INET;
383    send_addr.sin_port = htons(recv_port);
384
385    // Set up a signal handler for the alarm interrupt.
386    // It doesn't do anything other than interrupt select() below.
387    if (signal(SIGALRM, sigalarm_handler) == SIG_ERR) {
388        perror("signal SIGALRM");
389    }
390
391    struct itimerval itvalue = {
392        {1, 0}, {1, 0}
393    };
394    status = setitimer(ITIMER_REAL, &itvalue, NULL);
395    if (status != 0) {
396        perror("setitimer");
397    }
398
399    // We're ready to go.  Before going into the main loop,
400    // broadcast a load announcement to other machines.
401#ifdef notdef
402    broadcast_load();
403#endif
404    int maxfd = send_fd;
405    FD_ZERO(&saved_rfds);
406    FD_ZERO(&pipe_rfds);
407
408    for(n = 0; n < nservices; n++) {
409        FD_ZERO(&service_rfds[n]);
410        FD_SET(listen_fd[n], &saved_rfds);
411        if (listen_fd[n] > maxfd)
412            maxfd = listen_fd[n];
413    }
414
415    FD_SET(send_fd, &saved_rfds);
416
417    if (!debug_flag) {
418        if ( daemon(0,0) != 0 ) {
419            perror("daemon");
420            exit(1);
421        }
422    }
423
424    while(1) {
425        fd_set rfds = saved_rfds;
426     
427        status = select(maxfd+1, &rfds, NULL, NULL, 0);
428        if (status <= 0) {
429            if (sigalarm_set) {
430                update_load_average();
431                sigalarm_set = 0;
432            }
433            continue;
434        }
435     
436     
437        int accepted = 0;
438        for(n = 0; n < nservices; n++) {
439            if (FD_ISSET(listen_fd[n], &rfds)) {
440                // Accept a new connection.
441                struct sockaddr_in newaddr;
442                unsigned int addrlen = sizeof(newaddr);
443                int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen);
444                if (newfd < 0) {
445                    perror("accept");
446                    continue;
447                }
448             
449                printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr));
450                FD_SET(newfd, &saved_rfds);
451                maxfd = max(maxfd, newfd);
452                FD_SET(newfd, &service_rfds[n]);
453                accepted = 1;
454            }
455        }
456     
457        if (accepted)
458            continue;
459     
460        if (FD_ISSET(send_fd, &rfds)) {
461            int buffer[1000];
462            struct sockaddr_in peer_addr;
463            unsigned int len = sizeof(peer_addr);
464            status = recvfrom(send_fd, buffer, sizeof(buffer), 0,
465                              (struct sockaddr*)&peer_addr, &len);
466            if (status < 0) {
467                perror("recvfrom");
468                continue;
469            }
470            if (status != 8) {
471                fprintf(stderr,"Bogus message from %s\n",
472                        inet_ntoa(peer_addr.sin_addr));
473                continue;
474            }
475            float peer_load = ntohl(buffer[0]);
476            int peer_procs = ntohl(buffer[1]);
477            //printf("Load for %s is %f (%d processes).\n",
478            //       inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
479            int h;
480            int free_index=-1;
481            int found = 0;
482            for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
483                if (host_array[h].in_addr.s_addr == peer_addr.sin_addr.s_addr) {
484                    if (host_array[h].children != peer_procs) {
485                        printf("Load for %s is %f (%d processes).\n",
486                               inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
487                    }
488                    host_array[h].load = peer_load;
489                    host_array[h].children = peer_procs;
490                    found = 1;
491                    break;
492                }
493                if (host_array[h].in_addr.s_addr == 0 && free_index == -1) {
494                    free_index = h;
495                }
496            }
497            if (!found) {
498                host_array[free_index].in_addr.s_addr = peer_addr.sin_addr.s_addr;
499                host_array[free_index].load = peer_load;
500            }
501            continue;
502        }
503     
504        int i;
505        for(i=0; i< maxfd +1; i++) {
506            if (FD_ISSET(i,&rfds)) {
507             
508                // If this is a pipe, get the load.  Update.
509                if (FD_ISSET(i,&pipe_rfds)) {
510                    float value;
511                    status = read(i, &value, sizeof(value));
512                    if (status != 4) {
513                        //fprintf(stderr,"error reading pipe, child ended?\n");
514                        close_child(i);
515                        /*close(i);
516                          FD_CLR(i, &saved_rfds);
517                          FD_CLR(i, &pipe_rfds); */
518                    } else {
519                        note_request(i,value);
520                    }
521                    continue;
522                }
523             
524                // This must be a descriptor that we're waiting to from for
525                // the memory footprint.  Get it.
526                int msg;
527                status = read(i, &msg, 4);
528                if (status != 4) {
529                    fprintf(stderr,"Bad status on read (%d).", status);
530                    FD_CLR(i, &saved_rfds);
531                    clear_service_fd(i);
532                    close(i);
533                    continue;
534                }
535             
536                // find the new memory increment
537                int newmemory = ntohl(msg);
538             
539#ifdef notdef
540                // Find the best host to create a new child on.
541                int index = find_best_host();
542             
543                // Only redirect if another host's load is significantly less
544                // than our own...
545                if (index != -1 &&
546                    (host_array[index].load < (LOAD_REDIRECT_FACTOR * load))) {
547                 
548                    // If we're redirecting to another machine, give that
549                    // machine an extra boost in our copy of the load
550                    // statistics.  This will keep us from sending the very
551                    // next job to it.  Eventually, the other machine will
552                    // broadcast its real load and we can make an informed
553                    // decision as to who redirect to again.
554                    host_array[index].load += newmemory * INITIAL_LOAD;
555                 
556                    // Redirect to another machine.
557                    printf("Redirecting to %s\n",
558                           inet_ntoa(host_array[index].in_addr));
559                    write(i, &host_array[index].in_addr.s_addr, 4);
560                    FD_CLR(i, &saved_rfds);
561                    clear_service_fd(i);
562                    close(i);
563                    continue;
564                }
565#endif
566                memory_in_use += newmemory;
567                load += 2*INITIAL_LOAD;
568#ifdef notdef
569                broadcast_load();
570#endif
571                printf("Accepted new job with memory %d\n", newmemory);
572                //printf("My load is now %f\n", load);
573             
574                // accept the connection.
575                msg = 0;
576                if (write(i, &msg, 4) != 4) {
577                    fprintf(stderr, "short write for hostname\n");
578                }
579             
580                int pair[2];
581                status = pipe(pair);
582                if (status != 0) {
583                    perror("pipe");
584                }
585             
586                // Make the child side of the pipe non-blocking...
587                status = fcntl(pair[1], F_SETFL, O_NONBLOCK);
588                if (status < 0) {
589                    perror("fcntl");
590                }
591                dispNum++;
592                if (dispNum >= maxCards) {
593                    dispNum = 0;
594                }
595                // Fork the new process.  Connect i/o to the new socket.
596                status = fork();
597                if (status < 0) {
598                    perror("fork");
599                } else if (status == 0) {
600                 
601                    for(n = 0; n < MAX_SERVICES; n++) {
602                        if (FD_ISSET(i, &service_rfds[n])) {
603                            int status = 0;
604
605                            if (!debug_flag) {
606                                // disassociate
607                                status = daemon(0,1);
608                            }
609                           
610                            if (status == 0) {
611                                int fd;
612
613                                dup2(i, 0);  // stdin
614                                dup2(i, 1);  // stdout
615                                dup2(i, 2);  // stderr
616                                dup2(pair[1],3);
617                                // read end of pipe moved, and left open to
618                                // prevent SIGPIPE
619                                dup2(pair[0],4);
620                             
621                                for(fd=5; fd<FD_SETSIZE; fd++)
622                                    close(fd);
623                             
624                                if (maxCards > 1) {
625                                    displayVar[11] = dispNum + '0';
626                                }
627                                execvp(command_argv[n][0], command_argv[n]);
628                            }
629                            _exit(errno);
630                        }
631                    }
632                    _exit(EINVAL);
633                 
634                } else {
635                    int c;
636                    // reap initial child which will exit immediately
637                    // (grandchild continues)
638                    waitpid(status, NULL, 0);
639                    for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) {
640                        if (child_array[c].pipefd == 0) {
641                            child_array[c].memory = newmemory;
642                            child_array[c].pipefd = pair[0];
643                            child_array[c].requests = INITIAL_LOAD;
644                            status = close(pair[1]);
645                            if (status != 0) {
646                                perror("close pair[1]");
647                            }
648                            FD_SET(pair[0], &saved_rfds);
649                            FD_SET(pair[0], &pipe_rfds);
650                            maxfd = max(pair[0], maxfd);
651                            break;
652                        }
653                    }
654                 
655                    children++;
656                    broadcast_load();
657                }
658             
659             
660                FD_CLR(i, &saved_rfds);
661                clear_service_fd(i);
662                close(i);
663                break;
664            }
665         
666        } // for all connected_fds
667     
668    } // while(1)
669 
670}
671
Note: See TracBrowser for help on using the repository browser.