source: trunk/packages/vizservers/nanoscale/server.c @ 2720

Last change on this file since 2720 was 2403, checked in by gah, 13 years ago
File size: 13.4 KB
Line 
1#include <stdio.h>
2#include <unistd.h>
3#include <stdlib.h>
4#include <string.h>
5#include <signal.h>
6#include <sys/types.h>
7#include <sys/socket.h>
8#include <sys/wait.h>
9#include <sys/select.h>
10#include <sys/time.h>
11#include <arpa/inet.h>
12#include <fcntl.h>
13#include <netinet/in.h>
14#include <getopt.h>
15#include <errno.h>
16
17// The initial request load for a new renderer.
18#define INITIAL_LOAD 100000000.0
19
20// The factor that the load is divided by every second.
21#define LOAD_DROP_OFF 2.0
22
23// The broadcast interval (in seconds)
24#define BROADCAST_INTERVAL 5
25
26// The load of a remote machine must be less than this factor to
27// justify redirection.
28#define LOAD_REDIRECT_FACTOR 0.8
29
30// Maxium number of services we support
31#define MAX_SERVICES 100
32
33float load = 0;             // The present load average for this system.
34int memory_in_use = 0;      // Total memory in use by this system.
35int children = 0;           // Number of children running on this system.
36int send_fd;                // The file descriptor we broadcast through.
37struct sockaddr_in send_addr;  // The subnet address we broadcast to.
38fd_set saved_rfds;          // Descriptors we're reading from.
39fd_set pipe_rfds;           // Descriptors that are pipes to children.
40fd_set service_rfds[MAX_SERVICES];
41int maxCards = 1;
42int dispNum = -1;
43char displayVar[200];
44
45struct host_info {
46    struct in_addr in_addr;
47    float          load;
48    int            children;
49};
50
51struct child_info {
52    int   memory;
53    int   pipefd;
54    float requests;
55};
56
57struct host_info host_array[100];
58struct child_info child_array[100];
59
60/*
61 * min()/max() macros that also do
62 * strict type-checking.. See the
63 * "unnecessary" pointer comparison.
64 */
65#define min(x,y) ({                             \
66            typeof(x) _x = (x);                 \
67            typeof(y) _y = (y);                 \
68            (void) (&_x == &_y);                \
69            _x < _y ? _x : _y; })
70
71#define max(x,y) ({                             \
72            typeof(x) _x = (x);                 \
73            typeof(y) _y = (y);                 \
74            (void) (&_x == &_y);                \
75            _x > _y ? _x : _y; })
76
77
78static void
79close_child(int pipe_fd)
80{
81    int i;
82    for(i=0; i<sizeof(child_array)/sizeof(child_array[0]); i++) {
83        if (child_array[i].pipefd == pipe_fd) {
84            children--;
85            memory_in_use -= child_array[i].memory;
86            child_array[i].memory = 0;
87            FD_CLR(child_array[i].pipefd, &saved_rfds);
88            FD_CLR(child_array[i].pipefd, &pipe_rfds);
89            close(child_array[i].pipefd);
90            child_array[i].pipefd = 0;
91            break;
92        }
93    }
94 
95    printf("processes=%d, memory=%d, load=%f\n",
96           children, memory_in_use, load);
97
98}
99
100void note_request(int fd, float value)
101{
102    int c;
103    for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
104        if (child_array[c].pipefd == fd) {
105            child_array[c].requests += value;
106#ifdef DEBUGGING
107            printf("Updating requests from pipefd %d to %f\n",
108                   child_array[c].pipefd,
109                   child_array[c].requests);
110#endif
111            return;
112        }
113    }
114}
115
116volatile int sigalarm_set;
117
118static void
119sigalarm_handler(int signum)
120{
121    sigalarm_set = 1;
122}
123
124static void
125help(const char *argv0)
126{
127    fprintf(stderr,
128            "Syntax: %s [-d] -b <broadcast port> -l <listen port> -s <subnet> -c 'command'\n",
129            argv0);
130    exit(1);
131}
132
133static void
134clear_service_fd(int fd)
135{
136    int n;
137
138    for(n = 0; n < MAX_SERVICES; n++) {
139        if (FD_ISSET(fd, &service_rfds[n]))
140            FD_CLR(fd, &service_rfds[n]);
141    }
142}
143
144int
145main(int argc, char *argv[])
146{
147    char server_command[MAX_SERVICES][1000];
148    int nservices = 0;
149    int command_argc[MAX_SERVICES];
150    char **command_argv[MAX_SERVICES];
151    int val;
152    int listen_fd[MAX_SERVICES];
153    int status;
154    struct sockaddr_in listen_addr;
155    struct sockaddr_in recv_addr;
156    int listen_port[MAX_SERVICES];
157    int recv_port = -1;
158    int debug_flag = 0;
159    int n;
160
161    listen_port[0] = -1;
162    server_command[0][0] = 0;
163
164    strcpy(displayVar, "DISPLAY=:0.0");
165    if (putenv(displayVar) < 0) {
166        perror("putenv");
167    }
168    while(1) {
169        int c;
170        int option_index = 0;
171        struct option long_options[] = {
172            // name, has_arg, flag, val
173            { 0,0,0,0 },
174        };
175
176        c = getopt_long(argc, argv, "+b:c:l:s:x:d", long_options,
177                        &option_index);
178        if (c == -1)
179            break;
180
181        switch(c) {
182        case 'x': /* Number of video cards */
183            maxCards = strtoul(optarg, 0, 0);
184            if ((maxCards < 1) || (maxCards > 10)) {
185                fprintf(stderr, "bad number of max videocards specified\n");
186                return 1;
187            }
188            break;
189        case 'd':
190            debug_flag = 1;
191            break;
192        case 'b':
193            recv_port = strtoul(optarg, 0, 0);
194            break;
195        case 'c':
196            strncpy(server_command[nservices], optarg, sizeof(server_command[0]));
197
198            if (listen_port[nservices] == -1) {
199                fprintf(stderr,"Must specify -l port before each -c command.\n");
200                return 1;
201            }
202
203            nservices++;
204            listen_port[nservices] = -1;
205            break;
206        case 'l':
207            listen_port[nservices] = strtoul(optarg,0,0);
208            break;
209        case 's':
210            send_addr.sin_addr.s_addr = htonl(inet_network(optarg));
211            if (send_addr.sin_addr.s_addr == -1) {
212                fprintf(stderr,"Invalid subnet broadcast address");
213                return 1;
214            }
215            break;
216        default:
217            fprintf(stderr,"Don't know what option '%c'.\n", c);
218            return 1;
219        }
220    }
221    if (nservices == 0 ||
222        recv_port == -1 ||
223        server_command[0][0]=='\0') {
224        int i;
225        for (i = 0; i < argc; i++) {
226            fprintf(stderr, "argv[%d]=(%s)\n", i, argv[i]);
227        }
228        help(argv[0]);
229        return 1;
230    }
231
232    for(n = 0; n < nservices; n++) {
233        // Parse the command arguments...
234
235        command_argc[n]=0;
236        command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *));
237        command_argv[n][command_argc[n]] = strtok(server_command[n], " \t");
238        command_argc[n]++;
239        while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) {
240            command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *));
241            command_argc[n]++;
242        }
243
244        // Create a socket for listening.
245        listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
246        if (listen_fd[n] < 0) {
247            perror("socket");
248            exit(1);
249        }
250 
251        // If program is killed, drop the socket address reservation immediately.
252        val = 1;
253        status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val,
254                            sizeof(val));
255        if (status < 0) {
256            perror("setsockopt");
257            // Not fatal.  Keep on going.
258        }
259
260        // Bind this address to the socket.
261        listen_addr.sin_family = AF_INET;
262        listen_addr.sin_port = htons(listen_port[n]);
263        listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
264        status = bind(listen_fd[n], (struct sockaddr *)&listen_addr,
265                      sizeof(listen_addr));
266        if (status < 0) {
267            perror("bind");
268            exit(1);
269        }
270
271        // Listen on the specified port.
272        status = listen(listen_fd[n],5);
273        if (status < 0) {
274            perror("listen");
275        }
276    }
277
278    // Create a socket for broadcast.
279    send_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
280    if (send_fd < 0) {
281        perror("socket");
282        exit(1);
283    }
284
285    // If program is killed, drop the socket address reservation immediately.
286    val = 1;
287    status = setsockopt(send_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
288    if (status < 0) {
289        perror("setsockopt");
290        // Not fatal.  Keep on going.
291    }
292
293    // We're going to broadcast through this socket.
294    val = 1;
295    status = setsockopt(send_fd, SOL_SOCKET, SO_BROADCAST, &val, sizeof(val));
296    if (status < 0) {
297        perror("setsockopt");
298        // Not fatal.  Keep on going.
299    }
300
301    // Bind this address to the socket.
302    recv_addr.sin_family = AF_INET;
303    recv_addr.sin_port = htons(recv_port);
304    recv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
305    status = bind(send_fd, (struct sockaddr *)&recv_addr,
306                  sizeof(recv_addr));
307    if (status < 0) {
308        perror("bind");
309        exit(1);
310    }
311
312    // Set up the address that we broadcast to.
313    send_addr.sin_family = AF_INET;
314    send_addr.sin_port = htons(recv_port);
315
316    // Set up a signal handler for the alarm interrupt.
317    // It doesn't do anything other than interrupt select() below.
318    if (signal(SIGALRM, sigalarm_handler) == SIG_ERR) {
319        perror("signal SIGALRM");
320    }
321
322    struct itimerval itvalue = {
323        {1, 0}, {1, 0}
324    };
325    status = setitimer(ITIMER_REAL, &itvalue, NULL);
326    if (status != 0) {
327        perror("setitimer");
328    }
329
330    // We're ready to go.  Before going into the main loop,
331    // broadcast a load announcement to other machines.
332    int maxfd = send_fd;
333    FD_ZERO(&saved_rfds);
334    FD_ZERO(&pipe_rfds);
335
336    for(n = 0; n < nservices; n++) {
337        FD_ZERO(&service_rfds[n]);
338        FD_SET(listen_fd[n], &saved_rfds);
339        if (listen_fd[n] > maxfd)
340            maxfd = listen_fd[n];
341    }
342
343    FD_SET(send_fd, &saved_rfds);
344
345    if (!debug_flag) {
346        if ( daemon(0,0) != 0 ) {
347            perror("daemon");
348            exit(1);
349        }
350    }
351
352    while(1) {
353        fd_set rfds = saved_rfds;
354     
355        status = select(maxfd+1, &rfds, NULL, NULL, 0);
356        if (status <= 0) {
357            if (sigalarm_set) {
358                sigalarm_set = 0;
359            }
360            continue;
361        }
362     
363        int accepted = 0;
364        for(n = 0; n < nservices; n++) {
365            if (FD_ISSET(listen_fd[n], &rfds)) {
366                // Accept a new connection.
367                struct sockaddr_in newaddr;
368                unsigned int addrlen = sizeof(newaddr);
369                int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen);
370                if (newfd < 0) {
371                    perror("accept");
372                    continue;
373                }
374             
375                printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr));
376                FD_SET(newfd, &saved_rfds);
377                maxfd = max(maxfd, newfd);
378                FD_SET(newfd, &service_rfds[n]);
379                accepted = 1;
380            }
381        }
382     
383        if (accepted)
384            continue;
385     
386        if (FD_ISSET(send_fd, &rfds)) {
387            int buffer[1000];
388            struct sockaddr_in peer_addr;
389            unsigned int len = sizeof(peer_addr);
390            status = recvfrom(send_fd, buffer, sizeof(buffer), 0,
391                              (struct sockaddr*)&peer_addr, &len);
392            if (status < 0) {
393                perror("recvfrom");
394                continue;
395            }
396            if (status != 8) {
397                fprintf(stderr,"Bogus message from %s\n",
398                        inet_ntoa(peer_addr.sin_addr));
399                continue;
400            }
401            float peer_load = ntohl(buffer[0]);
402            int peer_procs = ntohl(buffer[1]);
403            //printf("Load for %s is %f (%d processes).\n",
404            //       inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
405            int h;
406            int free_index=-1;
407            int found = 0;
408            for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
409                if (host_array[h].in_addr.s_addr == peer_addr.sin_addr.s_addr) {
410                    if (host_array[h].children != peer_procs) {
411                        printf("Load for %s is %f (%d processes).\n",
412                               inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
413                    }
414                    host_array[h].load = peer_load;
415                    host_array[h].children = peer_procs;
416                    found = 1;
417                    break;
418                }
419                if (host_array[h].in_addr.s_addr == 0 && free_index == -1) {
420                    free_index = h;
421                }
422            }
423            if (!found) {
424                host_array[free_index].in_addr.s_addr = peer_addr.sin_addr.s_addr;
425                host_array[free_index].load = peer_load;
426            }
427            continue;
428        }
429     
430        int i;
431        for(i=0; i< maxfd +1; i++) {
432            if (FD_ISSET(i,&rfds)) {
433             
434                // If this is a pipe, get the load.  Update.
435                if (FD_ISSET(i,&pipe_rfds)) {
436                    float value;
437                    status = read(i, &value, sizeof(value));
438                    if (status != 4) {
439                        //fprintf(stderr,"error reading pipe, child ended?\n");
440                        close_child(i);
441                        /*close(i);
442                          FD_CLR(i, &saved_rfds);
443                          FD_CLR(i, &pipe_rfds); */
444                    } else {
445                        note_request(i,value);
446                    }
447                    continue;
448                }
449             
450                // This must be a descriptor that we're waiting to from for
451                // the memory footprint.  Get it.
452                int msg;
453                status = read(i, &msg, 4);
454                if (status != 4) {
455                    fprintf(stderr,"Bad status on read (%d).", status);
456                    FD_CLR(i, &saved_rfds);
457                    clear_service_fd(i);
458                    close(i);
459                    continue;
460                }
461             
462                // find the new memory increment
463                int newmemory = ntohl(msg);
464             
465                memory_in_use += newmemory;
466                load += 2*INITIAL_LOAD;
467                printf("Accepted new job with memory %d\n", newmemory);
468                //printf("My load is now %f\n", load);
469             
470                // accept the connection.
471                msg = 0;
472                if (write(i, &msg, 4) != 4) {
473                    fprintf(stderr, "short write for hostname\n");
474                }
475             
476                int pair[2];
477                status = pipe(pair);
478                if (status != 0) {
479                    perror("pipe");
480                }
481             
482                // Make the child side of the pipe non-blocking...
483                status = fcntl(pair[1], F_SETFL, O_NONBLOCK);
484                if (status < 0) {
485                    perror("fcntl");
486                }
487                dispNum++;
488                if (dispNum >= maxCards) {
489                    dispNum = 0;
490                }
491                // Fork the new process.  Connect i/o to the new socket.
492                status = fork();
493                if (status < 0) {
494                    perror("fork");
495                } else if (status == 0) {
496                 
497                    for(n = 0; n < MAX_SERVICES; n++) {
498                        if (FD_ISSET(i, &service_rfds[n])) {
499                            int status = 0;
500
501                            if (!debug_flag) {
502                                // disassociate
503                                status = daemon(0,1);
504                            }
505                           
506                            if (status == 0) {
507                                int fd;
508
509                                dup2(i, 0);  // stdin
510                                dup2(i, 1);  // stdout
511                                dup2(i, 2);  // stderr
512                                dup2(pair[1],3);
513                                // read end of pipe moved, and left open to
514                                // prevent SIGPIPE
515                                dup2(pair[0],4);
516                             
517                                for(fd=5; fd<FD_SETSIZE; fd++)
518                                    close(fd);
519                             
520                                if (maxCards > 1) {
521                                    displayVar[11] = dispNum + '0';
522                                }
523                                execvp(command_argv[n][0], command_argv[n]);
524                            }
525                            _exit(errno);
526                        }
527                    }
528                    _exit(EINVAL);
529                 
530                } else {
531                    int c;
532                    // reap initial child which will exit immediately
533                    // (grandchild continues)
534                    waitpid(status, NULL, 0);
535                    for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) {
536                        if (child_array[c].pipefd == 0) {
537                            child_array[c].memory = newmemory;
538                            child_array[c].pipefd = pair[0];
539                            child_array[c].requests = INITIAL_LOAD;
540                            status = close(pair[1]);
541                            if (status != 0) {
542                                perror("close pair[1]");
543                            }
544                            FD_SET(pair[0], &saved_rfds);
545                            FD_SET(pair[0], &pipe_rfds);
546                            maxfd = max(pair[0], maxfd);
547                            break;
548                        }
549                    }
550                 
551                    children++;
552                }
553             
554             
555                FD_CLR(i, &saved_rfds);
556                clear_service_fd(i);
557                close(i);
558                break;
559            }
560         
561        } // for all connected_fds
562     
563    } // while(1)
564 
565}
566
Note: See TracBrowser for help on using the repository browser.