Changeset 715 for trunk/gui/vizservers/nanoscale/server.c
- Timestamp:
- May 8, 2007, 1:55:00 PM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/gui/vizservers/nanoscale/server.c
r409 r715 12 12 #include <netinet/in.h> 13 13 #include <getopt.h> 14 #include <errno.h> 14 15 15 16 // The initial request load for a new renderer. … … 25 26 // justify redirection. 26 27 #define LOAD_REDIRECT_FACTOR 0.8 28 29 // Maxium number of services we support 30 #define MAX_SERVICES 100 27 31 28 32 float load = 0; // The present load average for this system. … … 33 37 fd_set saved_rfds; // Descriptors we're reading from. 34 38 fd_set pipe_rfds; // Descriptors that are pipes to children. 35 39 fd_set service_rfds[MAX_SERVICES]; 36 40 37 41 struct host_info { … … 42 46 43 47 struct child_info { 44 pid_t pid;45 48 int memory; 46 49 int pipefd; … … 113 116 } 114 117 115 void check_children(void) 116 { 117 int status; 118 119 // Collect any children that have exited. 120 do { 121 int return_status; 122 status = waitpid(0, &return_status, WNOHANG); 123 if (status > 0) { 124 children--; 125 } 118 close_child(int pipe_fd) 119 { 126 120 int i; 127 121 for(i=0; i<sizeof(child_array)/sizeof(child_array[0]); i++) { 128 if (child_array[i].pid == status) { 122 if (child_array[i].pipefd == pipe_fd) { 123 children--; 129 124 memory_in_use -= child_array[i].memory; 130 child_array[i].pid = 0;131 125 child_array[i].memory = 0; 132 status = close(child_array[i].pipefd);133 126 FD_CLR(child_array[i].pipefd, &saved_rfds); 134 127 FD_CLR(child_array[i].pipefd, &pipe_rfds); 128 close(child_array[i].pipefd); 129 child_array[i].pipefd = 0; 135 130 break; 136 137 138 } while(status > 0);139 printf("processes=%d, memory=%d, load=%f\n",131 } 132 } 133 134 printf("processes=%d, memory=%d, load=%f\n", 140 135 children, memory_in_use, load); 141 136 142 broadcast_load();137 broadcast_load(); 143 138 } 144 139 … … 150 145 child_array[c].requests += value; 151 146 #ifdef DEBUGGING 152 printf("Updating requests f or pid %d to %f\n",153 child_array[c].pi d,147 printf("Updating requests from pipefd %d to %f\n", 148 child_array[c].pipefd, 154 149 child_array[c].requests); 155 150 #endif … … 167 162 int c; 168 163 for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) { 169 if (child_array[c].pi d != 0) {164 if (child_array[c].pipefd != 0) { 170 165 newload += child_array[c].requests * child_array[c].memory; 171 166 child_array[c].requests = 0; … … 180 175 181 176 volatile int sigalarm_set; 182 volatile int sigchild_set;183 177 void sigalarm_handler(int signum) 184 178 { 185 179 sigalarm_set = 1; 186 }187 void sigchild_handler(int signum)188 {189 sigchild_set = 1;190 180 } 191 181 … … 198 188 } 199 189 190 int 191 clear_service_fd(int fd) 192 { 193 int n; 194 195 for(n = 0; n < MAX_SERVICES; n++) 196 { 197 if (FD_ISSET(fd, &service_rfds[n])) 198 FD_CLR(fd, &service_rfds[n]); 199 } 200 } 201 200 202 int main(int argc, char *argv[]) 201 203 { 202 char server_command[1000]; 203 int command_argc; 204 char **command_argv; 204 char server_command[MAX_SERVICES][1000]; 205 int nservices = 0; 206 int command_argc[MAX_SERVICES]; 207 char **command_argv[MAX_SERVICES]; 205 208 int val; 206 int listen_fd = -1;209 int listen_fd[MAX_SERVICES]; 207 210 int status; 208 211 struct sockaddr_in listen_addr; 209 212 struct sockaddr_in recv_addr; 210 int listen_port = -1;213 int listen_port[MAX_SERVICES]; 211 214 int recv_port = -1; 212 215 int connected_fds[10] = {0}; 213 216 int subnet_addr; 217 int n; 218 219 listen_port[0] = -1; 220 server_command[0][0] = 0; 214 221 215 222 while(1) { … … 231 238 break; 232 239 case 'c': 233 strncpy(server_command, optarg, sizeof(server_command)); 240 strncpy(server_command[nservices], optarg, sizeof(server_command[0])); 241 242 if (listen_port[nservices] == -1) { 243 fprintf(stderr,"Must specify -l port before each -c command.\n"); 244 return 1; 245 } 246 247 nservices++; 248 listen_port[nservices] = -1; 234 249 break; 235 250 case 'l': 236 listen_port = strtoul(optarg,0,0);251 listen_port[nservices] = strtoul(optarg,0,0); 237 252 break; 238 253 case 's': … … 250 265 } 251 266 252 if ( listen_port == -1||267 if (nservices == 0 || 253 268 recv_port == -1 || 254 269 subnet_addr == -1 || 255 server_command[0] =='\0') {270 server_command[0][0]=='\0') { 256 271 help(argv[0]); 257 272 return 1; 258 273 } 259 274 260 // Parse the command arguments... 261 command_argc=0; 262 command_argv = malloc((command_argc+2) * sizeof(char *)); 263 command_argv[command_argc] = strtok(server_command, " \t"); 264 command_argc++; 265 while( (command_argv[command_argc] = strtok(NULL, " \t"))) { 266 command_argv = realloc(command_argv, (command_argc+2) * sizeof(char *)); 267 command_argc++; 268 } 269 270 // Create a socket for listening. 271 listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 272 if (listen_fd < 0) { 273 perror("socket"); 274 exit(1); 275 } 276 277 // If program is killed, drop the socket address reservation immediately. 278 val = 1; 279 status = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 280 if (status < 0) { 281 perror("setsockopt"); 282 // Not fatal. Keep on going. 283 } 284 285 // Bind this address to the socket. 286 listen_addr.sin_family = AF_INET; 287 listen_addr.sin_port = htons(listen_port); 288 listen_addr.sin_addr.s_addr = htonl(INADDR_ANY); 289 status = bind(listen_fd, (struct sockaddr *)&listen_addr, 275 for(n = 0; n < nservices; n++) { 276 // Parse the command arguments... 277 278 command_argc[n]=0; 279 command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *)); 280 command_argv[n][command_argc[n]] = strtok(server_command[n], " \t"); 281 command_argc[n]++; 282 while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) { 283 command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *)); 284 command_argc[n]++; 285 } 286 287 // Create a socket for listening. 288 listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 289 if (listen_fd[n] < 0) { 290 perror("socket"); 291 exit(1); 292 } 293 294 // If program is killed, drop the socket address reservation immediately. 295 val = 1; 296 status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 297 if (status < 0) { 298 perror("setsockopt"); 299 // Not fatal. Keep on going. 300 } 301 302 // Bind this address to the socket. 303 listen_addr.sin_family = AF_INET; 304 listen_addr.sin_port = htons(listen_port[n]); 305 listen_addr.sin_addr.s_addr = htonl(INADDR_ANY); 306 status = bind(listen_fd[n], (struct sockaddr *)&listen_addr, 290 307 sizeof(listen_addr)); 291 if (status < 0) { 292 perror("bind"); 293 exit(1); 294 } 295 296 // Listen on the specified port. 297 status = listen(listen_fd,5); 298 if (status < 0) { 299 perror("listen"); 308 if (status < 0) { 309 perror("bind"); 310 exit(1); 311 } 312 313 // Listen on the specified port. 314 status = listen(listen_fd[n],5); 315 if (status < 0) { 316 perror("listen"); 317 } 300 318 } 301 319 … … 338 356 send_addr.sin_port = htons(recv_port); 339 357 340 // Set up a signal handler for exiting children.341 // It doesn't do anything other than interrupt select() below.342 if (signal(SIGCHLD,sigchild_handler) == SIG_ERR) {343 perror("signal SIGCHLD");344 }345 346 358 // Set up a signal handler for the alarm interrupt. 347 359 // It doesn't do anything other than interrupt select() below. 348 360 if (signal(SIGALRM,sigalarm_handler) == SIG_ERR) { 349 perror("signal SIG CHLD");361 perror("signal SIGALRM"); 350 362 } 351 363 … … 359 371 360 372 // We're ready to go. Before going into the main loop, 361 // do a check_children to broadcast a load announcement to 362 // other machines. 363 check_children(); 364 365 366 int maxfd = max(listen_fd,send_fd); 373 // broadcast a load announcement to other machines. 374 broadcast_load(); 375 376 int maxfd = send_fd; 367 377 FD_ZERO(&saved_rfds); 368 378 FD_ZERO(&pipe_rfds); 369 FD_SET(listen_fd, &saved_rfds); 379 380 for(n = 0; n < nservices; n++) { 381 FD_ZERO(&service_rfds[n]); 382 FD_SET(listen_fd[n], &saved_rfds); 383 if (listen_fd[n] > maxfd) 384 maxfd = listen_fd[n]; 385 } 386 370 387 FD_SET(send_fd, &saved_rfds); 371 388 while(1) { … … 379 396 sigalarm_set = 0; 380 397 } 381 if (sigchild_set) {382 check_children();383 sigchild_set = 0;384 }385 398 continue; 386 399 } 387 400 388 if (FD_ISSET(listen_fd, &rfds)) { 389 // Accept a new connection. 390 struct sockaddr_in newaddr; 391 unsigned int addrlen = sizeof(newaddr); 392 int newfd = accept(listen_fd, (struct sockaddr *)&newaddr, &addrlen); 393 if (newfd < 0) { 394 perror("accept"); 395 continue; 396 } 397 398 printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr)); 399 FD_SET(newfd, &saved_rfds); 400 maxfd = max(maxfd, newfd); 401 continue; 402 } 401 402 int accepted = 0; 403 for(n = 0; n < nservices; n++) { 404 if (FD_ISSET(listen_fd[n], &rfds)) { 405 // Accept a new connection. 406 struct sockaddr_in newaddr; 407 unsigned int addrlen = sizeof(newaddr); 408 int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen); 409 if (newfd < 0) { 410 perror("accept"); 411 continue; 412 } 413 414 printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr)); 415 FD_SET(newfd, &saved_rfds); 416 maxfd = max(maxfd, newfd); 417 FD_SET(newfd, &service_rfds[n]); 418 accepted = 1; 419 } 420 } 421 422 if (accepted) 423 continue; 403 424 404 425 if (FD_ISSET(send_fd, &rfds)) { … … 455 476 status = read(i, &value, sizeof(value)); 456 477 if (status != 4) { 457 close(i); 478 //fprintf(stderr,"error reading pipe, child ended?\n"); 479 close_child(i); 480 /*close(i); 458 481 FD_CLR(i, &saved_rfds); 459 FD_CLR(i, &pipe_rfds); 482 FD_CLR(i, &pipe_rfds); */ 460 483 } else { 461 484 note_request(i,value); … … 471 494 fprintf(stderr,"Bad status on read (%d).", status); 472 495 FD_CLR(i, &saved_rfds); 473 close(i); 496 clear_service_fd(i); 497 close(i); 474 498 continue; 475 499 } … … 498 522 write(i, &host_array[index].in_addr.s_addr, 4); 499 523 FD_CLR(i, &saved_rfds); 524 clear_service_fd(i); 500 525 close(i); 501 526 continue; … … 529 554 perror("fork"); 530 555 } else if (status == 0) { 531 dup2(i,0); // stdin 532 dup2(i,1); // stdout 533 dup2(i,2); // stderr 534 dup2(pair[1],3); 535 int fd; 536 for(fd=4; fd<FD_SETSIZE; fd++) { 537 close(fd); 538 } 539 execvp(command_argv[0], command_argv); 556 557 for(n = 0; n < MAX_SERVICES; n++) { 558 if (FD_ISSET(i, &service_rfds[n])) { 559 560 // disassociate 561 if ( daemon(0,1) == 0 ) { 562 int fd; 563 564 dup2(i,0); // stdin 565 dup2(i,1); // stdout 566 dup2(i,2); // stderr 567 dup2(pair[1],3); 568 569 for(fd=4; fd<FD_SETSIZE; fd++) 570 close(fd); 571 572 execvp(command_argv[n][0], command_argv[n]); 573 } 574 _exit(errno); 575 } 576 } 577 _exit(EINVAL); 578 540 579 } else { 541 580 int c; 581 // reap initial child which will exit immediately (grandchild continues) 582 waitpid(status, NULL, 0); 542 583 for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) { 543 if (child_array[c].pid == 0) { 544 child_array[c].pid = status; 584 if (child_array[c].pipefd == 0) { 545 585 child_array[c].memory = newmemory; 546 586 child_array[c].pipefd = pair[0]; … … 558 598 559 599 children++; 560 check_children();600 broadcast_load(); 561 601 } 562 602 563 603 564 604 FD_CLR(i, &saved_rfds); 605 clear_service_fd(i); 565 606 close(i); 566 607 break;
Note: See TracChangeset
for help on using the changeset viewer.