source: trunk/packages/vizservers/nanoscale/server.c @ 1278

Last change on this file since 1278 was 1278, checked in by gah, 13 years ago

vizservers/pymolproxy/pymolproxy.c

Added atomscale and projection commands.
Created buffered input layers between client-proxy, server-proxy.
Also created non-blocking buffered image layer between proxy-client.
This is a major update to the proxy. It should be more efficient.

vizservers/nanoscale/server.c

Removed broadcast feature. It's not being used. The server never
redirects the client to another host.

vizservers/start_viz.sh.in

Fixed sed script to replace the last octet of IP address with
broadcast 255.

vizservers/configure.in

Added test of Cg libraries and includes. Gentoo has moved them
from /usr/lib to /opt/nvidia-cg-toolkit/lib.

vizservers/nanovis/Makefile.in

Added locations of Cg libraries and headers.

File size: 16.0 KB
Line 
1#include <stdio.h>
2#include <unistd.h>
3#include <stdlib.h>
4#include <string.h>
5#include <signal.h>
6#include <sys/types.h>
7#include <sys/socket.h>
8#include <sys/wait.h>
9#include <sys/select.h>
10#include <sys/time.h>
11#include <arpa/inet.h>
12#include <fcntl.h>
13#include <netinet/in.h>
14#include <getopt.h>
15#include <errno.h>
16
17// The initial request load for a new renderer.
18#define INITIAL_LOAD 100000000.0
19
20// The factor that the load is divided by every second.
21#define LOAD_DROP_OFF 2.0
22
23// The broadcast interval (in seconds)
24#define BROADCAST_INTERVAL 5
25
26// The load of a remote machine must be less than this factor to
27// justify redirection.
28#define LOAD_REDIRECT_FACTOR 0.8
29
30// Maxium number of services we support
31#define MAX_SERVICES 100
32
33float load = 0;             // The present load average for this system.
34int memory_in_use = 0;      // Total memory in use by this system.
35int children = 0;           // Number of children running on this system.
36int send_fd;                // The file descriptor we broadcast through.
37struct sockaddr_in send_addr;  // The subnet address we broadcast to.
38fd_set saved_rfds;          // Descriptors we're reading from.
39fd_set pipe_rfds;           // Descriptors that are pipes to children.
40fd_set service_rfds[MAX_SERVICES];
41int maxCards = 1;
42int dispNum = -1;
43char displayVar[200];
44
45struct host_info {
46    struct in_addr in_addr;
47    float          load;
48    int            children;
49};
50
51struct child_info {
52    int   memory;
53    int   pipefd;
54    float requests;
55};
56
57struct host_info host_array[100];
58struct child_info child_array[100];
59
60/*
61 * min()/max() macros that also do
62 * strict type-checking.. See the
63 * "unnecessary" pointer comparison.
64 */
65#define min(x,y) ({                             \
66            typeof(x) _x = (x);                 \
67            typeof(y) _y = (y);                 \
68            (void) (&_x == &_y);                \
69            _x < _y ? _x : _y; })
70
71#define max(x,y) ({                             \
72            typeof(x) _x = (x);                 \
73            typeof(y) _y = (y);                 \
74            (void) (&_x == &_y);                \
75            _x > _y ? _x : _y; })
76
77static int
78find_best_host(void)
79{
80    int h;
81    float best = load;
82    int index = -1;
83    //printf("My load is %f\n", best);
84    for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
85        if (host_array[h].in_addr.s_addr == 0)
86            continue;
87        //printf("%d I think load for %s is %f   ", h,
88        //       inet_ntoa(host_array[h].in_addr), host_array[h].load);
89        if (host_array[h].children <= children) {
90            if (host_array[h].load < best) {
91                //if ((random() % 100) < 75) {
92                index = h;
93                best = host_array[h].load;
94                //}
95                //printf(" Better\n");
96            } else {
97                //printf(" Worse\n");
98            }
99        }
100    }
101
102    //printf("I choose %d\n", index);
103    return index;
104}
105
106static void
107broadcast_load(void)
108{
109    int msg[2];
110    msg[0] = htonl(load);
111    msg[1] = htonl(children);
112    int status;
113    status = sendto(send_fd, &msg, sizeof(msg), 0, (struct sockaddr *)&send_addr,
114                    sizeof(send_addr));
115    if (status < 0) {
116        perror("sendto");
117    }
118}
119
120static void
121close_child(int pipe_fd)
122{
123    int i;
124    for(i=0; i<sizeof(child_array)/sizeof(child_array[0]); i++) {
125        if (child_array[i].pipefd == pipe_fd) {
126            children--;
127            memory_in_use -= child_array[i].memory;
128            child_array[i].memory = 0;
129            FD_CLR(child_array[i].pipefd, &saved_rfds);
130            FD_CLR(child_array[i].pipefd, &pipe_rfds);
131            close(child_array[i].pipefd);
132            child_array[i].pipefd = 0;
133            break;
134        }
135    }
136 
137    printf("processes=%d, memory=%d, load=%f\n",
138           children, memory_in_use, load);
139
140#ifdef notdef
141    broadcast_load();
142#endif
143}
144
145void note_request(int fd, float value)
146{
147    int c;
148    for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
149        if (child_array[c].pipefd == fd) {
150            child_array[c].requests += value;
151#ifdef DEBUGGING
152            printf("Updating requests from pipefd %d to %f\n",
153                   child_array[c].pipefd,
154                   child_array[c].requests);
155#endif
156            return;
157        }
158    }
159}
160
161static void
162update_load_average(void)
163{
164    static unsigned int counter;
165
166    load = load / LOAD_DROP_OFF;
167    float newload = 0.0;
168    int c;
169    for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {
170        if (child_array[c].pipefd != 0) {
171            newload += child_array[c].requests * child_array[c].memory;
172            child_array[c].requests = 0;
173        }
174    }
175    load = load + newload;
176
177    if ((counter++ % BROADCAST_INTERVAL) == 0) {
178        broadcast_load();
179    }
180}
181
182volatile int sigalarm_set;
183
184static void
185sigalarm_handler(int signum)
186{
187    sigalarm_set = 1;
188}
189
190static void
191help(const char *argv0)
192{
193    fprintf(stderr,
194            "Syntax: %s [-d] -b <broadcast port> -l <listen port> -s <subnet> -c 'command'\n",
195            argv0);
196    exit(1);
197}
198
199static void
200clear_service_fd(int fd)
201{
202    int n;
203
204    for(n = 0; n < MAX_SERVICES; n++) {
205        if (FD_ISSET(fd, &service_rfds[n]))
206            FD_CLR(fd, &service_rfds[n]);
207    }
208}
209
210int
211main(int argc, char *argv[])
212{
213    char server_command[MAX_SERVICES][1000];
214    int nservices = 0;
215    int command_argc[MAX_SERVICES];
216    char **command_argv[MAX_SERVICES];
217    int val;
218    int listen_fd[MAX_SERVICES];
219    int status;
220    struct sockaddr_in listen_addr;
221    struct sockaddr_in recv_addr;
222    int listen_port[MAX_SERVICES];
223    int recv_port = -1;
224    int debug_flag = 0;
225    int n;
226
227    listen_port[0] = -1;
228    server_command[0][0] = 0;
229
230    strcpy(displayVar, "DISPLAY=:0.0");
231    if (putenv(displayVar) < 0) {
232        perror("putenv");
233    }
234    while(1) {
235        int c;
236        int option_index = 0;
237        struct option long_options[] = {
238            // name, has_arg, flag, val
239            { 0,0,0,0 },
240        };
241
242        c = getopt_long(argc, argv, "+b:c:l:s:x:d", long_options,
243                        &option_index);
244        if (c == -1)
245            break;
246
247        switch(c) {
248        case 'x': /* Number of video cards */
249            maxCards = strtoul(optarg, 0, 0);
250            if ((maxCards < 1) || (maxCards > 10)) {
251                fprintf(stderr, "bad number of max videocards specified\n");
252                return 1;
253            }
254            break;
255        case 'd':
256            debug_flag = 1;
257            break;
258        case 'b':
259            recv_port = strtoul(optarg, 0, 0);
260            break;
261        case 'c':
262            strncpy(server_command[nservices], optarg, sizeof(server_command[0]));
263
264            if (listen_port[nservices] == -1) {
265                fprintf(stderr,"Must specify -l port before each -c command.\n");
266                return 1;
267            }
268
269            nservices++;
270            listen_port[nservices] = -1;
271            break;
272        case 'l':
273            listen_port[nservices] = strtoul(optarg,0,0);
274            break;
275        case 's':
276            send_addr.sin_addr.s_addr = htonl(inet_network(optarg));
277            if (send_addr.sin_addr.s_addr == -1) {
278                fprintf(stderr,"Invalid subnet broadcast address");
279                return 1;
280            }
281            break;
282        default:
283            fprintf(stderr,"Don't know what option '%c'.\n", c);
284            return 1;
285        }
286    }
287    if (nservices == 0 ||
288        recv_port == -1 ||
289        server_command[0][0]=='\0') {
290        int i;
291        fprintf(stderr, "nservices=%d, recv_port=%d, server_command[0]=%s\n", nservices, recv_port, server_command[0]);
292        for (i = 0; i < argc; i++) {
293            fprintf(stderr, "argv[%d]=(%s)\n", i, argv[i]);
294        }
295        help(argv[0]);
296        return 1;
297    }
298
299    for(n = 0; n < nservices; n++) {
300        // Parse the command arguments...
301
302        command_argc[n]=0;
303        command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *));
304        command_argv[n][command_argc[n]] = strtok(server_command[n], " \t");
305        command_argc[n]++;
306        while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) {
307            command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *));
308            command_argc[n]++;
309        }
310
311        // Create a socket for listening.
312        listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
313        if (listen_fd[n] < 0) {
314            perror("socket");
315            exit(1);
316        }
317 
318        // If program is killed, drop the socket address reservation immediately.
319        val = 1;
320        status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val,
321                            sizeof(val));
322        if (status < 0) {
323            perror("setsockopt");
324            // Not fatal.  Keep on going.
325        }
326
327        // Bind this address to the socket.
328        listen_addr.sin_family = AF_INET;
329        listen_addr.sin_port = htons(listen_port[n]);
330        listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
331        status = bind(listen_fd[n], (struct sockaddr *)&listen_addr,
332                      sizeof(listen_addr));
333        if (status < 0) {
334            perror("bind");
335            exit(1);
336        }
337
338        // Listen on the specified port.
339        status = listen(listen_fd[n],5);
340        if (status < 0) {
341            perror("listen");
342        }
343    }
344
345    // Create a socket for broadcast.
346    send_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
347    if (send_fd < 0) {
348        perror("socket");
349        exit(1);
350    }
351
352    // If program is killed, drop the socket address reservation immediately.
353    val = 1;
354    status = setsockopt(send_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
355    if (status < 0) {
356        perror("setsockopt");
357        // Not fatal.  Keep on going.
358    }
359
360    // We're going to broadcast through this socket.
361    val = 1;
362    status = setsockopt(send_fd, SOL_SOCKET, SO_BROADCAST, &val, sizeof(val));
363    if (status < 0) {
364        perror("setsockopt");
365        // Not fatal.  Keep on going.
366    }
367
368    // Bind this address to the socket.
369    recv_addr.sin_family = AF_INET;
370    recv_addr.sin_port = htons(recv_port);
371    recv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
372    status = bind(send_fd, (struct sockaddr *)&recv_addr,
373                  sizeof(recv_addr));
374    if (status < 0) {
375        perror("bind");
376        exit(1);
377    }
378
379    // Set up the address that we broadcast to.
380    send_addr.sin_family = AF_INET;
381    send_addr.sin_port = htons(recv_port);
382
383    // Set up a signal handler for the alarm interrupt.
384    // It doesn't do anything other than interrupt select() below.
385    if (signal(SIGALRM, sigalarm_handler) == SIG_ERR) {
386        perror("signal SIGALRM");
387    }
388
389    struct itimerval itvalue = {
390        {1, 0}, {1, 0}
391    };
392    status = setitimer(ITIMER_REAL, &itvalue, NULL);
393    if (status != 0) {
394        perror("setitimer");
395    }
396
397    // We're ready to go.  Before going into the main loop,
398    // broadcast a load announcement to other machines.
399#ifdef notdef
400    broadcast_load();
401#endif
402    int maxfd = send_fd;
403    FD_ZERO(&saved_rfds);
404    FD_ZERO(&pipe_rfds);
405
406    for(n = 0; n < nservices; n++) {
407        FD_ZERO(&service_rfds[n]);
408        FD_SET(listen_fd[n], &saved_rfds);
409        if (listen_fd[n] > maxfd)
410            maxfd = listen_fd[n];
411    }
412
413    FD_SET(send_fd, &saved_rfds);
414
415    if (!debug_flag) {
416        if ( daemon(0,0) != 0 ) {
417            perror("daemon");
418            exit(1);
419        }
420    }
421
422    while(1) {
423        fd_set rfds = saved_rfds;
424     
425        status = select(maxfd+1, &rfds, NULL, NULL, 0);
426        if (status <= 0) {
427            if (sigalarm_set) {
428                update_load_average();
429                sigalarm_set = 0;
430            }
431            continue;
432        }
433     
434     
435        int accepted = 0;
436        for(n = 0; n < nservices; n++) {
437            if (FD_ISSET(listen_fd[n], &rfds)) {
438                // Accept a new connection.
439                struct sockaddr_in newaddr;
440                unsigned int addrlen = sizeof(newaddr);
441                int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen);
442                if (newfd < 0) {
443                    perror("accept");
444                    continue;
445                }
446             
447                printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr));
448                FD_SET(newfd, &saved_rfds);
449                maxfd = max(maxfd, newfd);
450                FD_SET(newfd, &service_rfds[n]);
451                accepted = 1;
452            }
453        }
454     
455        if (accepted)
456            continue;
457     
458        if (FD_ISSET(send_fd, &rfds)) {
459            int buffer[1000];
460            struct sockaddr_in peer_addr;
461            unsigned int len = sizeof(peer_addr);
462            status = recvfrom(send_fd, buffer, sizeof(buffer), 0,
463                              (struct sockaddr*)&peer_addr, &len);
464            if (status < 0) {
465                perror("recvfrom");
466                continue;
467            }
468            if (status != 8) {
469                fprintf(stderr,"Bogus message from %s\n",
470                        inet_ntoa(peer_addr.sin_addr));
471                continue;
472            }
473            float peer_load = ntohl(buffer[0]);
474            int peer_procs = ntohl(buffer[1]);
475            //printf("Load for %s is %f (%d processes).\n",
476            //       inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
477            int h;
478            int free_index=-1;
479            int found = 0;
480            for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) {
481                if (host_array[h].in_addr.s_addr == peer_addr.sin_addr.s_addr) {
482                    if (host_array[h].children != peer_procs) {
483                        printf("Load for %s is %f (%d processes).\n",
484                               inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs);
485                    }
486                    host_array[h].load = peer_load;
487                    host_array[h].children = peer_procs;
488                    found = 1;
489                    break;
490                }
491                if (host_array[h].in_addr.s_addr == 0 && free_index == -1) {
492                    free_index = h;
493                }
494            }
495            if (!found) {
496                host_array[free_index].in_addr.s_addr = peer_addr.sin_addr.s_addr;
497                host_array[free_index].load = peer_load;
498            }
499            continue;
500        }
501     
502        int i;
503        for(i=0; i< maxfd +1; i++) {
504            if (FD_ISSET(i,&rfds)) {
505             
506                // If this is a pipe, get the load.  Update.
507                if (FD_ISSET(i,&pipe_rfds)) {
508                    float value;
509                    status = read(i, &value, sizeof(value));
510                    if (status != 4) {
511                        //fprintf(stderr,"error reading pipe, child ended?\n");
512                        close_child(i);
513                        /*close(i);
514                          FD_CLR(i, &saved_rfds);
515                          FD_CLR(i, &pipe_rfds); */
516                    } else {
517                        note_request(i,value);
518                    }
519                    continue;
520                }
521             
522                // This must be a descriptor that we're waiting to from for
523                // the memory footprint.  Get it.
524                int msg;
525                status = read(i, &msg, 4);
526                if (status != 4) {
527                    fprintf(stderr,"Bad status on read (%d).", status);
528                    FD_CLR(i, &saved_rfds);
529                    clear_service_fd(i);
530                    close(i);
531                    continue;
532                }
533             
534                // find the new memory increment
535                int newmemory = ntohl(msg);
536             
537#ifdef notdef
538                // Find the best host to create a new child on.
539                int index = find_best_host();
540             
541                // Only redirect if another host's load is significantly less
542                // than our own...
543                if (index != -1 &&
544                    (host_array[index].load < (LOAD_REDIRECT_FACTOR * load))) {
545                 
546                    // If we're redirecting to another machine, give that
547                    // machine an extra boost in our copy of the load
548                    // statistics.  This will keep us from sending the very
549                    // next job to it.  Eventually, the other machine will
550                    // broadcast its real load and we can make an informed
551                    // decision as to who redirect to again.
552                    host_array[index].load += newmemory * INITIAL_LOAD;
553                 
554                    // Redirect to another machine.
555                    printf("Redirecting to %s\n",
556                           inet_ntoa(host_array[index].in_addr));
557                    write(i, &host_array[index].in_addr.s_addr, 4);
558                    FD_CLR(i, &saved_rfds);
559                    clear_service_fd(i);
560                    close(i);
561                    continue;
562                }
563#endif
564                memory_in_use += newmemory;
565                load += 2*INITIAL_LOAD;
566#ifdef notdef
567                broadcast_load();
568#endif
569                printf("Accepted new job with memory %d\n", newmemory);
570                //printf("My load is now %f\n", load);
571             
572                // accept the connection.
573                msg = 0;
574                write(i, &msg, 4);
575             
576                int pair[2];
577                status = pipe(pair);
578                if (status != 0) {
579                    perror("pipe");
580                }
581             
582                // Make the child side of the pipe non-blocking...
583                status = fcntl(pair[1], F_SETFL, O_NONBLOCK);
584                if (status < 0) {
585                    perror("fcntl");
586                }
587                dispNum++;
588                if (dispNum >= maxCards) {
589                    dispNum = 0;
590                }
591                // Fork the new process.  Connect i/o to the new socket.
592                status = fork();
593                if (status < 0) {
594                    perror("fork");
595                } else if (status == 0) {
596                 
597                    for(n = 0; n < MAX_SERVICES; n++) {
598                        if (FD_ISSET(i, &service_rfds[n])) {
599                            int status = 0;
600
601                            if (!debug_flag) {
602                                // disassociate
603                                status = daemon(0,1);
604                            }
605                           
606                            if (status == 0) {
607                                int fd;
608
609                                dup2(i, 0);  // stdin
610                                dup2(i, 1);  // stdout
611                                dup2(i, 2);  // stderr
612                                dup2(pair[1],3);
613                                // read end of pipe moved, and left open to
614                                // prevent SIGPIPE
615                                dup2(pair[0],4);
616                             
617                                for(fd=5; fd<FD_SETSIZE; fd++)
618                                    close(fd);
619                             
620                                if (maxCards > 1) {
621                                    displayVar[11] = dispNum + '0';
622                                }
623                                execvp(command_argv[n][0], command_argv[n]);
624                            }
625                            _exit(errno);
626                        }
627                    }
628                    _exit(EINVAL);
629                 
630                } else {
631                    int c;
632                    // reap initial child which will exit immediately
633                    // (grandchild continues)
634                    waitpid(status, NULL, 0);
635                    for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) {
636                        if (child_array[c].pipefd == 0) {
637                            child_array[c].memory = newmemory;
638                            child_array[c].pipefd = pair[0];
639                            child_array[c].requests = INITIAL_LOAD;
640                            status = close(pair[1]);
641                            if (status != 0) {
642                                perror("close pair[1]");
643                            }
644                            FD_SET(pair[0], &saved_rfds);
645                            FD_SET(pair[0], &pipe_rfds);
646                            maxfd = max(pair[0], maxfd);
647                            break;
648                        }
649                    }
650                 
651                    children++;
652                    broadcast_load();
653                }
654             
655             
656                FD_CLR(i, &saved_rfds);
657                clear_service_fd(i);
658                close(i);
659                break;
660            }
661         
662        } // for all connected_fds
663     
664    } // while(1)
665 
666}
667
Note: See TracBrowser for help on using the repository browser.