#include #include #include #include #include #include #include #include #include #include #include #include #include #include // The initial request load for a new renderer. #define INITIAL_LOAD 100000000.0 // The factor that the load is divided by every second. #define LOAD_DROP_OFF 2.0 // The broadcast interval (in seconds) #define BROADCAST_INTERVAL 5 // The load of a remote machine must be less than this factor to // justify redirection. #define LOAD_REDIRECT_FACTOR 0.8 // Maxium number of services we support #define MAX_SERVICES 100 float load = 0; // The present load average for this system. int memory_in_use = 0; // Total memory in use by this system. int children = 0; // Number of children running on this system. int send_fd; // The file descriptor we broadcast through. struct sockaddr_in send_addr; // The subnet address we broadcast to. fd_set saved_rfds; // Descriptors we're reading from. fd_set pipe_rfds; // Descriptors that are pipes to children. fd_set service_rfds[MAX_SERVICES]; struct host_info { struct in_addr in_addr; float load; int children; }; struct child_info { int memory; int pipefd; float requests; }; struct host_info host_array[100]; struct child_info child_array[100]; /* * min()/max() macros that also do * strict type-checking.. See the * "unnecessary" pointer comparison. */ #define min(x,y) ({ \ typeof(x) _x = (x); \ typeof(y) _y = (y); \ (void) (&_x == &_y); \ _x < _y ? _x : _y; }) #define max(x,y) ({ \ typeof(x) _x = (x); \ typeof(y) _y = (y); \ (void) (&_x == &_y); \ _x > _y ? _x : _y; }) int find_best_host(void) { int h; float best = load; int index = -1; //printf("My load is %f\n", best); for(h=0; h -l -s -c 'command'\n", argv0); exit(1); } int clear_service_fd(int fd) { int n; for(n = 0; n < MAX_SERVICES; n++) { if (FD_ISSET(fd, &service_rfds[n])) FD_CLR(fd, &service_rfds[n]); } } int main(int argc, char *argv[]) { char server_command[MAX_SERVICES][1000]; int nservices = 0; int command_argc[MAX_SERVICES]; char **command_argv[MAX_SERVICES]; int val; int listen_fd[MAX_SERVICES]; int status; struct sockaddr_in listen_addr; struct sockaddr_in recv_addr; int listen_port[MAX_SERVICES]; int recv_port = -1; int connected_fds[10] = {0}; int subnet_addr; int debug_flag = 0; int n; listen_port[0] = -1; server_command[0][0] = 0; while(1) { int c; int this_option_optind = optind ? optind : 1; int option_index = 0; struct option long_options[] = { // name, has_arg, flag, val { 0,0,0,0 }, }; c = getopt_long(argc, argv, "+b:c:l:s:d", long_options, &option_index); if (c == -1) break; switch(c) { case 'd': debug_flag = 1; break; case 'b': recv_port = strtoul(optarg,0,0); break; case 'c': strncpy(server_command[nservices], optarg, sizeof(server_command[0])); if (listen_port[nservices] == -1) { fprintf(stderr,"Must specify -l port before each -c command.\n"); return 1; } nservices++; listen_port[nservices] = -1; break; case 'l': listen_port[nservices] = strtoul(optarg,0,0); break; case 's': send_addr.sin_addr.s_addr = htonl(inet_network(optarg, &send_addr.sin_addr)); if (send_addr.sin_addr.s_addr == -1) { fprintf(stderr,"Invalid subnet broadcast address"); return 1; } break; default: fprintf(stderr,"Don't know what option '%c'.\n", c); return 1; } } if (nservices == 0 || recv_port == -1 || subnet_addr == -1 || server_command[0][0]=='\0') { help(argv[0]); return 1; } for(n = 0; n < nservices; n++) { // Parse the command arguments... command_argc[n]=0; command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *)); command_argv[n][command_argc[n]] = strtok(server_command[n], " \t"); command_argc[n]++; while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) { command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *)); command_argc[n]++; } // Create a socket for listening. listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (listen_fd[n] < 0) { perror("socket"); exit(1); } // If program is killed, drop the socket address reservation immediately. val = 1; status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); if (status < 0) { perror("setsockopt"); // Not fatal. Keep on going. } // Bind this address to the socket. listen_addr.sin_family = AF_INET; listen_addr.sin_port = htons(listen_port[n]); listen_addr.sin_addr.s_addr = htonl(INADDR_ANY); status = bind(listen_fd[n], (struct sockaddr *)&listen_addr, sizeof(listen_addr)); if (status < 0) { perror("bind"); exit(1); } // Listen on the specified port. status = listen(listen_fd[n],5); if (status < 0) { perror("listen"); } } // Create a socket for broadcast. send_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (send_fd < 0) { perror("socket"); exit(1); } // If program is killed, drop the socket address reservation immediately. val = 1; status = setsockopt(send_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); if (status < 0) { perror("setsockopt"); // Not fatal. Keep on going. } // We're going to broadcast through this socket. val = 1; status = setsockopt(send_fd, SOL_SOCKET, SO_BROADCAST, &val, sizeof(val)); if (status < 0) { perror("setsockopt"); // Not fatal. Keep on going. } // Bind this address to the socket. recv_addr.sin_family = AF_INET; recv_addr.sin_port = htons(recv_port); recv_addr.sin_addr.s_addr = htonl(INADDR_ANY); status = bind(send_fd, (struct sockaddr *)&recv_addr, sizeof(recv_addr)); if (status < 0) { perror("bind"); exit(1); } // Set up the address that we broadcast to. send_addr.sin_family = AF_INET; send_addr.sin_port = htons(recv_port); // Set up a signal handler for the alarm interrupt. // It doesn't do anything other than interrupt select() below. if (signal(SIGALRM,sigalarm_handler) == SIG_ERR) { perror("signal SIGALRM"); } struct itimerval itvalue = { {1, 0}, {1, 0} }; status = setitimer(ITIMER_REAL, &itvalue, NULL); if (status != 0) { perror("setitimer"); } // We're ready to go. Before going into the main loop, // broadcast a load announcement to other machines. broadcast_load(); int maxfd = send_fd; FD_ZERO(&saved_rfds); FD_ZERO(&pipe_rfds); for(n = 0; n < nservices; n++) { FD_ZERO(&service_rfds[n]); FD_SET(listen_fd[n], &saved_rfds); if (listen_fd[n] > maxfd) maxfd = listen_fd[n]; } FD_SET(send_fd, &saved_rfds); if (debug_flag == 0) { if ( daemon(0,0) != 0 ) { perror("daemon"); exit(1); } } while(1) { fd_set rfds = saved_rfds; status = select(maxfd+1, &rfds, NULL, NULL, 0); if (status <= 0) { if (sigalarm_set) { update_load_average(); sigalarm_set = 0; } continue; } int accepted = 0; for(n = 0; n < nservices; n++) { if (FD_ISSET(listen_fd[n], &rfds)) { // Accept a new connection. struct sockaddr_in newaddr; unsigned int addrlen = sizeof(newaddr); int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen); if (newfd < 0) { perror("accept"); continue; } printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr)); FD_SET(newfd, &saved_rfds); maxfd = max(maxfd, newfd); FD_SET(newfd, &service_rfds[n]); accepted = 1; } } if (accepted) continue; if (FD_ISSET(send_fd, &rfds)) { int buffer[1000]; struct sockaddr_in peer_addr; unsigned int len = sizeof(peer_addr); status = recvfrom(send_fd, buffer, sizeof(buffer), 0, (struct sockaddr*)&peer_addr, &len); if (status < 0) { perror("recvfrom"); continue; } if (status != 8) { fprintf(stderr,"Bogus message from %s\n", inet_ntoa(peer_addr.sin_addr)); continue; } float peer_load = ntohl(buffer[0]); int peer_procs = ntohl(buffer[1]); //printf("Load for %s is %f (%d processes).\n", // inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs); int h; int free_index=-1; int found = 0; for(h=0; h