Changeset 1029
- Timestamp:
- Jun 9, 2008 9:40:23 AM (15 years ago)
- Location:
- trunk
- Files:
-
- 4 deleted
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lang/Makefile.in
r1018 r1029 60 60 $(MAKE) -C ruby all 61 61 62 install_python: build_python62 install_python: 63 63 $(MAKE) -C python install 64 install_perl: build_perl64 install_perl: 65 65 $(MAKE) -C perl install 66 install_tcl: build_tcl66 install_tcl: 67 67 $(MAKE) -C tcl install 68 install_matlab: build_matlab68 install_matlab: 69 69 $(MAKE) -C matlab install 70 install_octave: build_octave70 install_octave: 71 71 $(MAKE) -C octave install 72 install_ruby: build_ruby72 install_ruby: 73 73 $(MAKE) -C ruby install 74 74 -
trunk/lang/python/Makefile.in
r1018 r1029 33 33 $(INSTALL_DATA) $(srcdir)/Rappture/*.cc build/tmp 34 34 35 install: all35 install: 36 36 $(PYTHON) setup.py install --prefix=$(prefix) 37 37 -
trunk/lang/tcl/scripts/Makefile.in
r1018 r1029 50 50 install: tclIndex 51 51 $(INSTALL) -d $(destdir) 52 @for i in $(FILES); do \53 $(INSTALL) -m 444 $ (srcdir)/$$i $(destdir) ; \52 for i in $(FILES); do \ 53 $(INSTALL) -m 444 $$i $(destdir) ; \ 54 54 done 55 55 $(INSTALL) -m 0444 tclIndex $(destdir) -
trunk/packages/vizservers/nanoscale/Makefile.in
r952 r1029 1 1 TARGETS = client mycat nanoscale 2 2 3 CC = @CC@ 4 CFLAGS = @CFLAGS@ 3 CC = @CC@ 4 CFLAGS = @CFLAGS@ 5 EXTRA_CFLAGS = -Wall 6 DEFINES = @DEFINES@ 7 CC_SWITCHES = $(CFLAGS) $(EXTRA_CFLAGS) $(INCLUDES) 5 8 LDFLAGS = @LDFLAGS@ 6 9 … … 22 25 INSTALL_SCRIPT = ${INSTALL} -m 444 23 26 27 SERVER_OBJS = server.o 28 CLIENT_OBJS = client.o clientlib.o 29 24 30 .PHONY: all install clean distclean 25 31 26 32 all: $(TARGETS) 27 33 28 nanoscale: server.o29 $(CC) $(C FLAGS) $(LDFLAGS) server.o -o nanoscale34 nanoscale: $(SERVER_OBJS) 35 $(CC) $(CC_SWITCHES) -o $@ $^ $(LIBS) 30 36 31 37 install: nanoscale 32 38 $(INSTALL_PROGRAM) -D nanoscale $(bindir)/nanoscale 33 39 34 client: client.o clientlib.o 40 client: $(CLIENT_OBJS) 41 42 .c.o: 43 $(CC) $(CC_SWITCHES) -o $@ -c $< 35 44 36 45 clean: 37 rm -rf a.out *.o*~ core* $(TARGETS) *.log *.tmp logfile* .deps/*.d46 rm -rf a.out $(SERVER_OBJS) *~ core* $(TARGETS) *.log *.tmp logfile* .deps/*.d 38 47 39 48 distclean: clean -
trunk/packages/vizservers/nanoscale/server.c
r722 r1029 9 9 #include <sys/select.h> 10 10 #include <sys/time.h> 11 #include <arpa/inet.h> 11 12 #include <fcntl.h> 12 13 #include <netinet/in.h> … … 40 41 41 42 struct host_info { 42 struct in_addr in_addr;43 float load;44 int children;43 struct in_addr in_addr; 44 float load; 45 int children; 45 46 }; 46 47 47 48 struct child_info { 48 int memory;49 int pipefd;50 float requests;49 int memory; 50 int pipefd; 51 float requests; 51 52 }; 52 53 … … 62 63 * "unnecessary" pointer comparison. 63 64 */ 64 #define min(x,y) ({ \ 65 typeof(x) _x = (x); \ 66 typeof(y) _y = (y); \ 67 (void) (&_x == &_y); \ 68 _x < _y ? _x : _y; }) 69 70 #define max(x,y) ({ \ 71 typeof(x) _x = (x); \ 72 typeof(y) _y = (y); \ 73 (void) (&_x == &_y); \ 74 _x > _y ? _x : _y; }) 75 76 int find_best_host(void) 77 { 78 int h; 79 float best = load; 80 int index = -1; 81 //printf("My load is %f\n", best); 82 for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) { 83 if (host_array[h].in_addr.s_addr == 0) 84 continue; 85 //printf("%d I think load for %s is %f ", h, 86 // inet_ntoa(host_array[h].in_addr), host_array[h].load); 87 if (host_array[h].children <= children) { 88 if (host_array[h].load < best) { 89 //if ((random() % 100) < 75) { 90 index = h; 91 best = host_array[h].load; 92 //} 93 //printf(" Better\n"); 94 } else { 95 //printf(" Worse\n"); 96 } 97 } 98 } 99 100 //printf("I choose %d\n", index); 101 return index; 102 } 103 104 105 void broadcast_load(void) 106 { 107 int msg[2]; 108 msg[0] = htonl(load); 109 msg[1] = htonl(children); 110 int status; 111 status = sendto(send_fd, &msg, sizeof(msg), 0, (struct sockaddr *)&send_addr, 112 sizeof(send_addr)); 113 if (status < 0) { 114 perror("sendto"); 115 } 116 } 117 65 #define min(x,y) ({ \ 66 typeof(x) _x = (x); \ 67 typeof(y) _y = (y); \ 68 (void) (&_x == &_y); \ 69 _x < _y ? _x : _y; }) 70 71 #define max(x,y) ({ \ 72 typeof(x) _x = (x); \ 73 typeof(y) _y = (y); \ 74 (void) (&_x == &_y); \ 75 _x > _y ? _x : _y; }) 76 77 static int 78 find_best_host(void) 79 { 80 int h; 81 float best = load; 82 int index = -1; 83 //printf("My load is %f\n", best); 84 for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) { 85 if (host_array[h].in_addr.s_addr == 0) 86 continue; 87 //printf("%d I think load for %s is %f ", h, 88 // inet_ntoa(host_array[h].in_addr), host_array[h].load); 89 if (host_array[h].children <= children) { 90 if (host_array[h].load < best) { 91 //if ((random() % 100) < 75) { 92 index = h; 93 best = host_array[h].load; 94 //} 95 //printf(" Better\n"); 96 } else { 97 //printf(" Worse\n"); 98 } 99 } 100 } 101 102 //printf("I choose %d\n", index); 103 return index; 104 } 105 106 107 static void 108 broadcast_load(void) 109 { 110 int msg[2]; 111 msg[0] = htonl(load); 112 msg[1] = htonl(children); 113 int status; 114 status = sendto(send_fd, &msg, sizeof(msg), 0, (struct sockaddr *)&send_addr, 115 sizeof(send_addr)); 116 if (status < 0) { 117 perror("sendto"); 118 } 119 } 120 121 static void 118 122 close_child(int pipe_fd) 119 123 { 120 124 int i; 121 125 for(i=0; i<sizeof(child_array)/sizeof(child_array[0]); i++) { 122 123 124 125 126 127 128 129 130 131 132 126 if (child_array[i].pipefd == pipe_fd) { 127 children--; 128 memory_in_use -= child_array[i].memory; 129 child_array[i].memory = 0; 130 FD_CLR(child_array[i].pipefd, &saved_rfds); 131 FD_CLR(child_array[i].pipefd, &pipe_rfds); 132 close(child_array[i].pipefd); 133 child_array[i].pipefd = 0; 134 break; 135 } 136 } 133 137 134 138 printf("processes=%d, memory=%d, load=%f\n", 135 139 children, memory_in_use, load); 136 140 137 141 broadcast_load(); … … 140 144 void note_request(int fd, float value) 141 145 { 142 int c;143 for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) {144 145 146 int c; 147 for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) { 148 if (child_array[c].pipefd == fd) { 149 child_array[c].requests += value; 146 150 #ifdef DEBUGGING 147 148 149 151 printf("Updating requests from pipefd %d to %f\n", 152 child_array[c].pipefd, 153 child_array[c].requests); 150 154 #endif 151 return; 152 } 153 } 154 } 155 156 void update_load_average(void) 157 { 158 static unsigned int counter; 159 160 load = load / LOAD_DROP_OFF; 161 float newload = 0.0; 162 int c; 163 for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) { 164 if (child_array[c].pipefd != 0) { 165 newload += child_array[c].requests * child_array[c].memory; 166 child_array[c].requests = 0; 167 } 168 } 169 load = load + newload; 170 171 if ((counter++ % BROADCAST_INTERVAL) == 0) { 155 return; 156 } 157 } 158 } 159 160 static void 161 update_load_average(void) 162 { 163 static unsigned int counter; 164 165 load = load / LOAD_DROP_OFF; 166 float newload = 0.0; 167 int c; 168 for(c=0; c < sizeof(child_array)/sizeof(child_array[0]); c++) { 169 if (child_array[c].pipefd != 0) { 170 newload += child_array[c].requests * child_array[c].memory; 171 child_array[c].requests = 0; 172 } 173 } 174 load = load + newload; 175 176 if ((counter++ % BROADCAST_INTERVAL) == 0) { 177 broadcast_load(); 178 } 179 } 180 181 volatile int sigalarm_set; 182 183 static void 184 sigalarm_handler(int signum) 185 { 186 sigalarm_set = 1; 187 } 188 189 static void 190 help(const char *argv0) 191 { 192 fprintf(stderr, 193 "Syntax: %s [-d] -b <broadcast port> -l <listen port> -s <subnet> -c 'command'\n", 194 argv0); 195 exit(1); 196 } 197 198 static void 199 clear_service_fd(int fd) 200 { 201 int n; 202 203 for(n = 0; n < MAX_SERVICES; n++) { 204 if (FD_ISSET(fd, &service_rfds[n])) 205 FD_CLR(fd, &service_rfds[n]); 206 } 207 } 208 209 int 210 main(int argc, char *argv[]) 211 { 212 char server_command[MAX_SERVICES][1000]; 213 int nservices = 0; 214 int command_argc[MAX_SERVICES]; 215 char **command_argv[MAX_SERVICES]; 216 int val; 217 int listen_fd[MAX_SERVICES]; 218 int status; 219 struct sockaddr_in listen_addr; 220 struct sockaddr_in recv_addr; 221 int listen_port[MAX_SERVICES]; 222 int recv_port = -1; 223 int debug_flag = 0; 224 int n; 225 226 listen_port[0] = -1; 227 server_command[0][0] = 0; 228 229 while(1) { 230 int c; 231 int option_index = 0; 232 struct option long_options[] = { 233 // name, has_arg, flag, val 234 { 0,0,0,0 }, 235 }; 236 237 c = getopt_long(argc, argv, "+b:c:l:s:d", long_options, &option_index); 238 if (c == -1) 239 break; 240 241 switch(c) { 242 case 'd': 243 debug_flag = 1; 244 break; 245 case 'b': 246 recv_port = strtoul(optarg,0,0); 247 break; 248 case 'c': 249 strncpy(server_command[nservices], optarg, sizeof(server_command[0])); 250 251 if (listen_port[nservices] == -1) { 252 fprintf(stderr,"Must specify -l port before each -c command.\n"); 253 return 1; 254 } 255 256 nservices++; 257 listen_port[nservices] = -1; 258 break; 259 case 'l': 260 listen_port[nservices] = strtoul(optarg,0,0); 261 break; 262 case 's': 263 send_addr.sin_addr.s_addr = htonl(inet_network(optarg)); 264 if (send_addr.sin_addr.s_addr == -1) { 265 fprintf(stderr,"Invalid subnet broadcast address"); 266 return 1; 267 } 268 break; 269 default: 270 fprintf(stderr,"Don't know what option '%c'.\n", c); 271 return 1; 272 } 273 } 274 if (nservices == 0 || 275 recv_port == -1 || 276 server_command[0][0]=='\0') { 277 int i; 278 fprintf(stderr, "nservices=%d, recv_port=%d, server_command[0]=%s\n", nservices, recv_port, server_command[0]); 279 for (i = 0; i < argc; i++) { 280 fprintf(stderr, "argv[%d]=(%s)\n", i, argv[i]); 281 } 282 help(argv[0]); 283 return 1; 284 } 285 286 for(n = 0; n < nservices; n++) { 287 // Parse the command arguments... 288 289 command_argc[n]=0; 290 command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *)); 291 command_argv[n][command_argc[n]] = strtok(server_command[n], " \t"); 292 command_argc[n]++; 293 while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) { 294 command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *)); 295 command_argc[n]++; 296 } 297 298 // Create a socket for listening. 299 listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 300 if (listen_fd[n] < 0) { 301 perror("socket"); 302 exit(1); 303 } 304 305 // If program is killed, drop the socket address reservation immediately. 306 val = 1; 307 status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val, 308 sizeof(val)); 309 if (status < 0) { 310 perror("setsockopt"); 311 // Not fatal. Keep on going. 312 } 313 314 // Bind this address to the socket. 315 listen_addr.sin_family = AF_INET; 316 listen_addr.sin_port = htons(listen_port[n]); 317 listen_addr.sin_addr.s_addr = htonl(INADDR_ANY); 318 status = bind(listen_fd[n], (struct sockaddr *)&listen_addr, 319 sizeof(listen_addr)); 320 if (status < 0) { 321 perror("bind"); 322 exit(1); 323 } 324 325 // Listen on the specified port. 326 status = listen(listen_fd[n],5); 327 if (status < 0) { 328 perror("listen"); 329 } 330 } 331 332 // Create a socket for broadcast. 333 send_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 334 if (send_fd < 0) { 335 perror("socket"); 336 exit(1); 337 } 338 339 // If program is killed, drop the socket address reservation immediately. 340 val = 1; 341 status = setsockopt(send_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 342 if (status < 0) { 343 perror("setsockopt"); 344 // Not fatal. Keep on going. 345 } 346 347 // We're going to broadcast through this socket. 348 val = 1; 349 status = setsockopt(send_fd, SOL_SOCKET, SO_BROADCAST, &val, sizeof(val)); 350 if (status < 0) { 351 perror("setsockopt"); 352 // Not fatal. Keep on going. 353 } 354 355 // Bind this address to the socket. 356 recv_addr.sin_family = AF_INET; 357 recv_addr.sin_port = htons(recv_port); 358 recv_addr.sin_addr.s_addr = htonl(INADDR_ANY); 359 status = bind(send_fd, (struct sockaddr *)&recv_addr, 360 sizeof(recv_addr)); 361 if (status < 0) { 362 perror("bind"); 363 exit(1); 364 } 365 366 // Set up the address that we broadcast to. 367 send_addr.sin_family = AF_INET; 368 send_addr.sin_port = htons(recv_port); 369 370 // Set up a signal handler for the alarm interrupt. 371 // It doesn't do anything other than interrupt select() below. 372 if (signal(SIGALRM, sigalarm_handler) == SIG_ERR) { 373 perror("signal SIGALRM"); 374 } 375 376 struct itimerval itvalue = { 377 {1, 0}, {1, 0} 378 }; 379 status = setitimer(ITIMER_REAL, &itvalue, NULL); 380 if (status != 0) { 381 perror("setitimer"); 382 } 383 384 // We're ready to go. Before going into the main loop, 385 // broadcast a load announcement to other machines. 172 386 broadcast_load(); 173 } 174 } 175 176 volatile int sigalarm_set; 177 void sigalarm_handler(int signum) 178 { 179 sigalarm_set = 1; 180 } 181 182 void help(const char *argv0) 183 { 184 fprintf(stderr, 185 "Syntax: %s [-d] -b <broadcast port> -l <listen port> -s <subnet> -c 'command'\n", 186 argv0); 187 exit(1); 188 } 189 190 int 191 clear_service_fd(int fd) 192 { 193 int n; 194 195 for(n = 0; n < MAX_SERVICES; n++) 196 { 197 if (FD_ISSET(fd, &service_rfds[n])) 198 FD_CLR(fd, &service_rfds[n]); 199 } 200 } 201 202 int main(int argc, char *argv[]) 203 { 204 char server_command[MAX_SERVICES][1000]; 205 int nservices = 0; 206 int command_argc[MAX_SERVICES]; 207 char **command_argv[MAX_SERVICES]; 208 int val; 209 int listen_fd[MAX_SERVICES]; 210 int status; 211 struct sockaddr_in listen_addr; 212 struct sockaddr_in recv_addr; 213 int listen_port[MAX_SERVICES]; 214 int recv_port = -1; 215 int connected_fds[10] = {0}; 216 int subnet_addr; 217 int debug_flag = 0; 218 int n; 219 220 listen_port[0] = -1; 221 server_command[0][0] = 0; 222 223 while(1) { 224 int c; 225 int this_option_optind = optind ? optind : 1; 226 int option_index = 0; 227 struct option long_options[] = { 228 // name, has_arg, flag, val 229 { 0,0,0,0 }, 230 }; 231 232 c = getopt_long(argc, argv, "+b:c:l:s:d", long_options, &option_index); 233 if (c == -1) 234 break; 235 236 switch(c) { 237 case 'd': 238 debug_flag = 1; 239 break; 240 case 'b': 241 recv_port = strtoul(optarg,0,0); 242 break; 243 case 'c': 244 strncpy(server_command[nservices], optarg, sizeof(server_command[0])); 245 246 if (listen_port[nservices] == -1) { 247 fprintf(stderr,"Must specify -l port before each -c command.\n"); 248 return 1; 249 } 250 251 nservices++; 252 listen_port[nservices] = -1; 253 break; 254 case 'l': 255 listen_port[nservices] = strtoul(optarg,0,0); 256 break; 257 case 's': 258 send_addr.sin_addr.s_addr = htonl(inet_network(optarg, 259 &send_addr.sin_addr)); 260 if (send_addr.sin_addr.s_addr == -1) { 261 fprintf(stderr,"Invalid subnet broadcast address"); 262 return 1; 263 } 264 break; 265 default: 266 fprintf(stderr,"Don't know what option '%c'.\n", c); 267 return 1; 268 } 269 } 270 271 if (nservices == 0 || 272 recv_port == -1 || 273 subnet_addr == -1 || 274 server_command[0][0]=='\0') { 275 help(argv[0]); 276 return 1; 277 } 278 279 for(n = 0; n < nservices; n++) { 280 // Parse the command arguments... 281 282 command_argc[n]=0; 283 command_argv[n] = malloc((command_argc[n]+2) * sizeof(char *)); 284 command_argv[n][command_argc[n]] = strtok(server_command[n], " \t"); 285 command_argc[n]++; 286 while( (command_argv[n][command_argc[n]] = strtok(NULL, " \t"))) { 287 command_argv[n] = realloc(command_argv[n], (command_argc[n]+2) * sizeof(char *)); 288 command_argc[n]++; 289 } 290 291 // Create a socket for listening. 292 listen_fd[n] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 293 if (listen_fd[n] < 0) { 294 perror("socket"); 295 exit(1); 296 } 297 298 // If program is killed, drop the socket address reservation immediately. 299 val = 1; 300 status = setsockopt(listen_fd[n], SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 301 if (status < 0) { 302 perror("setsockopt"); 303 // Not fatal. Keep on going. 304 } 305 306 // Bind this address to the socket. 307 listen_addr.sin_family = AF_INET; 308 listen_addr.sin_port = htons(listen_port[n]); 309 listen_addr.sin_addr.s_addr = htonl(INADDR_ANY); 310 status = bind(listen_fd[n], (struct sockaddr *)&listen_addr, 311 sizeof(listen_addr)); 312 if (status < 0) { 313 perror("bind"); 314 exit(1); 315 } 316 317 // Listen on the specified port. 318 status = listen(listen_fd[n],5); 319 if (status < 0) { 320 perror("listen"); 321 } 322 } 323 324 // Create a socket for broadcast. 325 send_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 326 if (send_fd < 0) { 327 perror("socket"); 328 exit(1); 329 } 330 331 // If program is killed, drop the socket address reservation immediately. 332 val = 1; 333 status = setsockopt(send_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 334 if (status < 0) { 335 perror("setsockopt"); 336 // Not fatal. Keep on going. 337 } 338 339 // We're going to broadcast through this socket. 340 val = 1; 341 status = setsockopt(send_fd, SOL_SOCKET, SO_BROADCAST, &val, sizeof(val)); 342 if (status < 0) { 343 perror("setsockopt"); 344 // Not fatal. Keep on going. 345 } 346 347 // Bind this address to the socket. 348 recv_addr.sin_family = AF_INET; 349 recv_addr.sin_port = htons(recv_port); 350 recv_addr.sin_addr.s_addr = htonl(INADDR_ANY); 351 status = bind(send_fd, (struct sockaddr *)&recv_addr, 352 sizeof(recv_addr)); 353 if (status < 0) { 354 perror("bind"); 355 exit(1); 356 } 357 358 // Set up the address that we broadcast to. 359 send_addr.sin_family = AF_INET; 360 send_addr.sin_port = htons(recv_port); 361 362 // Set up a signal handler for the alarm interrupt. 363 // It doesn't do anything other than interrupt select() below. 364 if (signal(SIGALRM,sigalarm_handler) == SIG_ERR) { 365 perror("signal SIGALRM"); 366 } 367 368 struct itimerval itvalue = { 369 {1, 0}, {1, 0} 370 }; 371 status = setitimer(ITIMER_REAL, &itvalue, NULL); 372 if (status != 0) { 373 perror("setitimer"); 374 } 375 376 // We're ready to go. Before going into the main loop, 377 // broadcast a load announcement to other machines. 378 broadcast_load(); 379 380 int maxfd = send_fd; 381 FD_ZERO(&saved_rfds); 382 FD_ZERO(&pipe_rfds); 383 384 for(n = 0; n < nservices; n++) { 385 FD_ZERO(&service_rfds[n]); 386 FD_SET(listen_fd[n], &saved_rfds); 387 if (listen_fd[n] > maxfd) 388 maxfd = listen_fd[n]; 389 } 390 391 FD_SET(send_fd, &saved_rfds); 392 393 if (debug_flag == 0) { 394 if ( daemon(0,0) != 0 ) { 395 perror("daemon"); 396 exit(1); 397 } 398 } 399 400 while(1) { 401 402 fd_set rfds = saved_rfds; 403 404 status = select(maxfd+1, &rfds, NULL, NULL, 0); 405 if (status <= 0) { 406 if (sigalarm_set) { 407 update_load_average(); 408 sigalarm_set = 0; 409 } 410 continue; 411 } 412 413 414 int accepted = 0; 387 388 int maxfd = send_fd; 389 FD_ZERO(&saved_rfds); 390 FD_ZERO(&pipe_rfds); 391 415 392 for(n = 0; n < nservices; n++) { 416 if (FD_ISSET(listen_fd[n], &rfds)) { 417 // Accept a new connection. 418 struct sockaddr_in newaddr; 419 unsigned int addrlen = sizeof(newaddr); 420 int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen); 421 if (newfd < 0) { 422 perror("accept"); 423 continue; 424 } 425 426 printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr)); 427 FD_SET(newfd, &saved_rfds); 428 maxfd = max(maxfd, newfd); 429 FD_SET(newfd, &service_rfds[n]); 430 accepted = 1; 431 } 432 } 433 393 FD_ZERO(&service_rfds[n]); 394 FD_SET(listen_fd[n], &saved_rfds); 395 if (listen_fd[n] > maxfd) 396 maxfd = listen_fd[n]; 397 } 398 399 FD_SET(send_fd, &saved_rfds); 400 401 if (debug_flag == 0) { 402 if ( daemon(0,0) != 0 ) { 403 perror("daemon"); 404 exit(1); 405 } 406 } 407 408 while(1) { 409 fd_set rfds = saved_rfds; 410 411 status = select(maxfd+1, &rfds, NULL, NULL, 0); 412 if (status <= 0) { 413 if (sigalarm_set) { 414 update_load_average(); 415 sigalarm_set = 0; 416 } 417 continue; 418 } 419 420 421 int accepted = 0; 422 for(n = 0; n < nservices; n++) { 423 if (FD_ISSET(listen_fd[n], &rfds)) { 424 // Accept a new connection. 425 struct sockaddr_in newaddr; 426 unsigned int addrlen = sizeof(newaddr); 427 int newfd = accept(listen_fd[n], (struct sockaddr *)&newaddr, &addrlen); 428 if (newfd < 0) { 429 perror("accept"); 430 continue; 431 } 432 433 printf("New connection from %s\n", inet_ntoa(newaddr.sin_addr)); 434 FD_SET(newfd, &saved_rfds); 435 maxfd = max(maxfd, newfd); 436 FD_SET(newfd, &service_rfds[n]); 437 accepted = 1; 438 } 439 } 440 434 441 if (accepted) 435 442 continue; 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 for(i=0; i<maxfd+1; i++) {483 484 485 486 487 488 489 490 491 443 444 if (FD_ISSET(send_fd, &rfds)) { 445 int buffer[1000]; 446 struct sockaddr_in peer_addr; 447 unsigned int len = sizeof(peer_addr); 448 status = recvfrom(send_fd, buffer, sizeof(buffer), 0, 449 (struct sockaddr*)&peer_addr, &len); 450 if (status < 0) { 451 perror("recvfrom"); 452 continue; 453 } 454 if (status != 8) { 455 fprintf(stderr,"Bogus message from %s\n", 456 inet_ntoa(peer_addr.sin_addr)); 457 continue; 458 } 459 float peer_load = ntohl(buffer[0]); 460 int peer_procs = ntohl(buffer[1]); 461 //printf("Load for %s is %f (%d processes).\n", 462 // inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs); 463 int h; 464 int free_index=-1; 465 int found = 0; 466 for(h=0; h<sizeof(host_array)/sizeof(host_array[0]); h++) { 467 if (host_array[h].in_addr.s_addr == peer_addr.sin_addr.s_addr) { 468 if (host_array[h].children != peer_procs) { 469 printf("Load for %s is %f (%d processes).\n", 470 inet_ntoa(peer_addr.sin_addr), peer_load, peer_procs); 471 } 472 host_array[h].load = peer_load; 473 host_array[h].children = peer_procs; 474 found = 1; 475 break; 476 } 477 if (host_array[h].in_addr.s_addr == 0 && free_index == -1) { 478 free_index = h; 479 } 480 } 481 if (!found) { 482 host_array[free_index].in_addr.s_addr = peer_addr.sin_addr.s_addr; 483 host_array[free_index].load = peer_load; 484 } 485 continue; 486 } 487 488 int i; 489 for(i=0; i< maxfd +1; i++) { 490 if (FD_ISSET(i,&rfds)) { 491 492 // If this is a pipe, get the load. Update. 493 if (FD_ISSET(i,&pipe_rfds)) { 494 float value; 495 status = read(i, &value, sizeof(value)); 496 if (status != 4) { 497 //fprintf(stderr,"error reading pipe, child ended?\n"); 498 close_child(i); 492 499 /*close(i); 493 FD_CLR(i, &saved_rfds); 494 FD_CLR(i, &pipe_rfds); */ 495 } else { 496 note_request(i,value); 497 } 498 continue; 499 } 500 501 // This must be a descriptor that we're waiting to from 502 // for the memory footprint. Get it. 503 int msg; 504 status = read(i, &msg, 4); 505 if (status != 4) { 506 fprintf(stderr,"Bad status on read (%d).", status); 507 FD_CLR(i, &saved_rfds); 508 clear_service_fd(i); 509 close(i); 510 continue; 511 } 512 513 // find the new memory increment 514 int newmemory = ntohl(msg); 515 516 // Find the best host to create a new child on. 517 int index = find_best_host(); 518 519 // Only redirect if another host's load is significantly less 520 // than our own... 521 if (index != -1 && 522 (host_array[index].load < (LOAD_REDIRECT_FACTOR * load))) { 523 524 // If we're redirecting to another machine, give that machine 525 // an extra boost in our copy of the load statistics. This will 526 // keep us from sending the very next job to it. Eventually, the 527 // other machine will broadcast its real load and we can make an 528 // informed decision as to who redirect to again. 529 host_array[index].load += newmemory * INITIAL_LOAD; 530 531 // Redirect to another machine. 532 printf("Redirecting to %s\n", 533 inet_ntoa(host_array[index].in_addr)); 534 write(i, &host_array[index].in_addr.s_addr, 4); 535 FD_CLR(i, &saved_rfds); 536 clear_service_fd(i); 537 close(i); 538 continue; 539 } 540 541 memory_in_use += newmemory; 542 load += 2*INITIAL_LOAD; 543 broadcast_load(); 544 printf("Accepted new job with memory %d\n", newmemory); 545 //printf("My load is now %f\n", load); 546 547 // accept the connection. 548 msg = 0; 549 write(i, &msg, 4); 550 551 int pair[2]; 552 status = pipe(pair); 553 if (status != 0) { 554 perror("pipe"); 555 } 556 557 // Make the child side of the pipe non-blocking... 558 status = fcntl(pair[1], F_SETFL, O_NONBLOCK); 559 if (status < 0) { 560 perror("fcntl"); 561 } 562 563 // Fork the new process. Connect i/o to the new socket. 564 status = fork(); 565 if (status < 0) { 566 perror("fork"); 567 } else if (status == 0) { 568 569 for(n = 0; n < MAX_SERVICES; n++) { 570 if (FD_ISSET(i, &service_rfds[n])) { 571 572 // disassociate 573 if ( daemon(0,1) == 0 ) { 574 int fd; 575 576 dup2(i,0); // stdin 577 dup2(i,1); // stdout 578 dup2(i,2); // stderr 579 dup2(pair[1],3); 580 // read end of pipe moved, and left open to prevent SIGPIPE 581 dup2(pair[0],4); 582 583 for(fd=5; fd<FD_SETSIZE; fd++) 584 close(fd); 585 586 execvp(command_argv[n][0], command_argv[n]); 587 } 588 _exit(errno); 500 FD_CLR(i, &saved_rfds); 501 FD_CLR(i, &pipe_rfds); */ 502 } else { 503 note_request(i,value); 589 504 } 590 } 591 _exit(EINVAL); 592 593 } else { 594 int c; 595 // reap initial child which will exit immediately (grandchild continues) 596 waitpid(status, NULL, 0); 597 for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) { 598 if (child_array[c].pipefd == 0) { 599 child_array[c].memory = newmemory; 600 child_array[c].pipefd = pair[0]; 601 child_array[c].requests = INITIAL_LOAD; 602 status = close(pair[1]); 603 if (status != 0) { 604 perror("close pair[1]"); 605 } 606 FD_SET(pair[0], &saved_rfds); 607 FD_SET(pair[0], &pipe_rfds); 608 maxfd = max(pair[0], maxfd); 609 break; 610 } 611 } 612 613 children++; 614 broadcast_load(); 615 } 616 617 618 FD_CLR(i, &saved_rfds); 619 clear_service_fd(i); 620 close(i); 621 break; 622 } 623 624 } // for all connected_fds 625 626 } // while(1) 627 628 } 629 505 continue; 506 } 507 508 // This must be a descriptor that we're waiting to from for 509 // the memory footprint. Get it. 510 int msg; 511 status = read(i, &msg, 4); 512 if (status != 4) { 513 fprintf(stderr,"Bad status on read (%d).", status); 514 FD_CLR(i, &saved_rfds); 515 clear_service_fd(i); 516 close(i); 517 continue; 518 } 519 520 // find the new memory increment 521 int newmemory = ntohl(msg); 522 523 // Find the best host to create a new child on. 524 int index = find_best_host(); 525 526 // Only redirect if another host's load is significantly less 527 // than our own... 528 if (index != -1 && 529 (host_array[index].load < (LOAD_REDIRECT_FACTOR * load))) { 530 531 // If we're redirecting to another machine, give that 532 // machine an extra boost in our copy of the load 533 // statistics. This will keep us from sending the very 534 // next job to it. Eventually, the other machine will 535 // broadcast its real load and we can make an informed 536 // decision as to who redirect to again. 537 host_array[index].load += newmemory * INITIAL_LOAD; 538 539 // Redirect to another machine. 540 printf("Redirecting to %s\n", 541 inet_ntoa(host_array[index].in_addr)); 542 write(i, &host_array[index].in_addr.s_addr, 4); 543 FD_CLR(i, &saved_rfds); 544 clear_service_fd(i); 545 close(i); 546 continue; 547 } 548 549 memory_in_use += newmemory; 550 load += 2*INITIAL_LOAD; 551 broadcast_load(); 552 printf("Accepted new job with memory %d\n", newmemory); 553 //printf("My load is now %f\n", load); 554 555 // accept the connection. 556 msg = 0; 557 write(i, &msg, 4); 558 559 int pair[2]; 560 status = pipe(pair); 561 if (status != 0) { 562 perror("pipe"); 563 } 564 565 // Make the child side of the pipe non-blocking... 566 status = fcntl(pair[1], F_SETFL, O_NONBLOCK); 567 if (status < 0) { 568 perror("fcntl"); 569 } 570 571 // Fork the new process. Connect i/o to the new socket. 572 status = fork(); 573 if (status < 0) { 574 perror("fork"); 575 } else if (status == 0) { 576 577 for(n = 0; n < MAX_SERVICES; n++) { 578 if (FD_ISSET(i, &service_rfds[n])) { 579 580 // disassociate 581 if ( daemon(0,1) == 0 ) { 582 int fd; 583 584 dup2(i, 0); // stdin 585 dup2(i, 1); // stdout 586 dup2(i, 2); // stderr 587 dup2(pair[1],3); 588 // read end of pipe moved, and left open to 589 // prevent SIGPIPE 590 dup2(pair[0],4); 591 592 for(fd=5; fd<FD_SETSIZE; fd++) 593 close(fd); 594 595 execvp(command_argv[n][0], command_argv[n]); 596 } 597 _exit(errno); 598 } 599 } 600 _exit(EINVAL); 601 602 } else { 603 int c; 604 // reap initial child which will exit immediately 605 // (grandchild continues) 606 waitpid(status, NULL, 0); 607 for(c=0; c<sizeof(child_array)/sizeof(child_array[0]); c++) { 608 if (child_array[c].pipefd == 0) { 609 child_array[c].memory = newmemory; 610 child_array[c].pipefd = pair[0]; 611 child_array[c].requests = INITIAL_LOAD; 612 status = close(pair[1]); 613 if (status != 0) { 614 perror("close pair[1]"); 615 } 616 FD_SET(pair[0], &saved_rfds); 617 FD_SET(pair[0], &pipe_rfds); 618 maxfd = max(pair[0], maxfd); 619 break; 620 } 621 } 622 623 children++; 624 broadcast_load(); 625 } 626 627 628 FD_CLR(i, &saved_rfds); 629 clear_service_fd(i); 630 close(i); 631 break; 632 } 633 634 } // for all connected_fds 635 636 } // while(1) 637 638 } 639 -
trunk/packages/vizservers/nanovis/Command.cpp
r1028 r1029 72 72 #include <RenderContext.h> 73 73 #include <NvLIC.h> 74 #include <RpOutcome.h> 75 #include <RpBuffer.h> 74 76 75 77 #define ISO_TEST 1 -
trunk/packages/vizservers/nanovis/Makefile.in
r1028 r1029 48 48 RP_DIR = @RP_DIR@ 49 49 RP_INC_SPEC = -I$(RP_DIR)/include -I$(RP_DIR)/include/rappture2 50 RP_LIB_SPEC = -L$(RP_DIR)/lib -lrappture2 -l b64 -lz50 RP_LIB_SPEC = -L$(RP_DIR)/lib -lrappture2 -lrappture -lexpat -lb64 -lz 51 51 52 52 TF_DIR = ./transfer-function -
trunk/packages/vizservers/nanovis/RpDX.h
r1028 r1029 26 26 public: 27 27 DX(); 28 DX(const char* filename, R pOutcome *resultPtr);28 DX(const char* filename, Rappture::Outcome *resultPtr); 29 29 DX(const DX& rpdx); 30 30 DX& operator=(const DX& rpdx);
Note: See TracChangeset
for help on using the changeset viewer.