Changeset 1620
- Timestamp:
- 04/18/08 16:29:00 (3 weeks ago)
- Files:
-
- nebula/trunk/src/classify.c (modified) (3 diffs)
- nebula/trunk/src/nebula.c (modified) (6 diffs)
- nebula/trunk/src/nebula.h (modified) (3 diffs)
- nebula/trunk/src/sig.c (modified) (1 diff)
- nebula/trunk/src/signals.c (modified) (4 diffs)
- nebula/trunk/src/util.c (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
nebula/trunk/src/classify.c
r1619 r1620 42 42 int rv; 43 43 qelem *elem; 44 struct timeval timeout;45 fd_set rfds;46 u_char buf;47 44 48 45 for (;;) { 49 FD_ZERO(&rfds); 50 FD_SET(cpipe[0], &rfds); 51 52 timeout.tv_sec = 10; 53 timeout.tv_usec = 0; 54 55 // this thread just processes a clustering job queue 56 // it reads wakeup calls from a pipe 57 switch (select(cpipe[0]+1, &rfds, NULL, NULL, &timeout)) { 58 case -1: 59 fprintf(stderr, "Error - select() failed: %s.\n", strerror(errno)); 46 if (sem_wait(&sessions_semaphore) == -1) { 47 fprintf(stderr, "Error - Unable to decrement semaphore: %s.\n", strerror(errno)); 60 48 exit(EXIT_FAILURE); 61 case 0: 62 break; 63 default: 64 if (FD_ISSET(cpipe[0], &rfds)) { 65 66 if (read(cpipe[0], &buf, 1) != 1) { 67 fprintf(stderr, "Error - Unable to read from pipe: %s.\n", strerror(errno)); 68 exit(EXIT_FAILURE); 69 } 70 71 pthread_mutex_lock(&classifyq_mutex); 72 elem = queue_cuthead(classifyq); 73 pthread_mutex_unlock(&classifyq_mutex); 74 75 while (elem) { 76 // call classification function 77 if ((rv = classify((submission *) elem->data)) == -1) { 78 fprintf(stderr, "Error - Unable to classify submission.\n"); 79 free(elem); 80 return((void *) -1); 81 } else if (rv == 0) { 82 // known attack, do nothing 83 free(elem); 84 return((void *) 0); 85 } else { 86 // new attack 87 free(elem->data); 88 free(elem); 89 } 90 91 pthread_mutex_lock(&classifyq_mutex); 92 elem = queue_cuthead(classifyq); 93 pthread_mutex_unlock(&classifyq_mutex); 94 } 95 } 96 break; 49 } 50 51 pthread_mutex_lock(&classifyq_mutex); 52 elem = queue_cuthead(classifyq); 53 pthread_mutex_unlock(&classifyq_mutex); 54 55 if (elem) { 56 // call classification function 57 if ((rv = classify((submission *) elem->data)) == -1) { 58 fprintf(stderr, "Error - Unable to classify submission.\n"); 59 free(elem); 60 return((void *) -1); 61 } else if (rv == 0) { 62 // known attack, do nothing 63 free(elem); 64 return((void *) 0); 65 } else { 66 // new attack 67 free(elem->data); 68 free(elem); 69 } 97 70 } 98 71 } … … 293 266 for (jqelem = siggenq->head; jqelem && jqelem->data != ((hash*)t->data)->cl; jqelem = jqelem->next); 294 267 295 // queue cluster for signature generation268 // generate signature for this cluster 296 269 if (!jqelem || jqelem->data != ((hash*)t->data)->cl) { 297 270 // append cluster to signature generation job queue … … 300 273 exit(EXIT_FAILURE); 301 274 } 302 if (write(gpipe[1], "x", sizeof(u_char)) == -1) { 303 fprintf(stderr, "Error - Unable to write to pipe: %s.\n", strerror(errno)); 275 276 // wake up a signature generation thread 277 if (sem_post(&siggen_semaphore) == -1) { 278 fprintf(stderr, "Error - Unable to increment semaphore: %s.\n", strerror(errno)); 304 279 exit(EXIT_FAILURE); 305 280 } nebula/trunk/src/nebula.c
r1619 r1620 79 79 int main(int argc, char *argv[]) { 80 80 int i, qsize, port, listen_fd, rv, gthread_no; 81 u_char daemonize , bell;81 u_char daemonize; 82 82 char option; 83 83 pthread_t ntid; … … 104 104 105 105 lock_mutex = 1; // if this is set, threads will lock mutexes 106 107 bell = 1; // wakeup bell108 106 109 107 … … 139 137 } 140 138 139 140 // initialize semaphore 141 if ((sem_init(&sessions_semaphore, 0, 0) == -1) || 142 (sem_init(&sessions_semaphore, 0, 0) == -1)) { 143 fprintf(stderr, "Error - Unable to initialize semaphore: %m.\n"); 144 exit(EXIT_FAILURE); 145 } 146 141 147 printf("\n Nebula %s Copyright (C) 2007-2008 Tillmann Werner <tillmann.werner@gmx.de>\n\n", VERSION); 142 148 … … 246 252 247 253 // spawn classification thread 248 // the main thread writes to a pipe to inform the classification thread about pending jobs249 250 if (pipe(cpipe) == -1) {251 fprintf(stderr, "Error - Unable to create interthread communication pipe: %s.\n", strerror(errno));252 exit(EXIT_FAILURE);253 }254 255 254 if (pthread_attr_init(&ptattr) || pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED)) { 256 255 fprintf(stderr, "Error - Unable to initialize thread attributes: %s.\n", strerror(errno)); … … 266 265 267 266 // spawn signature generation thread(s) 268 // the main thread writes to a pipe to inform the generation thread(s) about pending jobs269 270 267 for (; gthread_no; gthread_no--) { 271 268 if (verbose > 1) printf(" Signature generation thread spawned.\n"); 272 if (pipe(gpipe) == -1) {273 fprintf(stderr, "Error - Unable to create interthread communication pipe: %s.\n", strerror(errno));274 exit(EXIT_FAILURE);275 }276 269 if (pthread_attr_init(&ptattr) || pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED)) { 277 270 fprintf(stderr, "Error - Unable to initialize thread attributes: %s.\n", strerror(errno)); … … 419 412 pthread_mutex_unlock(&classifyq_mutex); 420 413 421 // now send a 'wakeup byte' to theclassification thread422 if ( write(cpipe[1], &bell, sizeof(u_char)) == -1) {423 fprintf(stderr, "Error - Unable to write to pipe: %s.\n", strerror(errno));414 // wake up a classification thread 415 if (sem_post(&sessions_semaphore) == -1) { 416 fprintf(stderr, "Error - Unable to increment semaphore: %s.\n", strerror(errno)); 424 417 exit(EXIT_FAILURE); 425 418 } nebula/trunk/src/nebula.h
r1618 r1620 28 28 29 29 #include <pthread.h> 30 #include <semaphore.h> 30 31 31 32 #include "avl.h" … … 43 44 nfds_t pollfd_set_size; // size of dynamically adjusted set 44 45 45 int lock_mutex , cpipe[2], gpipe[2];46 int lock_mutex; 46 47 47 48 u_char verbose; … … 72 73 pthread_rwlock_t sighash_trie_lock; 73 74 75 sem_t sessions_semaphore; 76 sem_t siggen_semaphore; 77 74 78 #endif nebula/trunk/src/sig.c
r1618 r1620 358 358 void *pt_pollgq(void *param) { 359 359 qelem *elem; 360 struct timeval timeout;361 fd_set rfds;362 u_char buf;363 360 364 361 for (;;) { 365 FD_ZERO(&rfds); 366 FD_SET(gpipe[0], &rfds); 367 368 timeout.tv_sec = 10; 369 timeout.tv_usec = 0; 370 371 switch (select(gpipe[0]+1, &rfds, NULL, NULL, &timeout)) { 372 case -1: 373 fprintf(stderr, "Error - select() failed: %s.\n", strerror(errno)); 362 if (sem_wait(&siggen_semaphore) == -1) { 363 fprintf(stderr, "Error - Unable to decrement semaphore: %s.\n", strerror(errno)); 374 364 exit(EXIT_FAILURE); 375 case 0: 376 break; 377 default: 378 if (FD_ISSET(gpipe[0], &rfds)) { 379 if (read(gpipe[0], &buf, 1) != 1) { 380 fprintf(stderr, "Error - Unable to read from pipe: %s.\n", strerror(errno)); 381 exit(EXIT_FAILURE); 382 } 383 } 384 pthread_mutex_lock(&siggenq_mutex); 385 elem = queue_cuthead(siggenq); 386 pthread_mutex_unlock(&siggenq_mutex); 387 388 if (elem != NULL) { 389 gensig(elem->data); 390 free(elem); 391 } 392 393 break; 365 } 366 367 pthread_mutex_lock(&siggenq_mutex); 368 elem = queue_cuthead(siggenq); 369 pthread_mutex_unlock(&siggenq_mutex); 370 371 if (elem) { 372 gensig(elem->data); 373 free(elem); 394 374 } 395 375 } nebula/trunk/src/signals.c
r1618 r1620 95 95 u_char i; 96 96 struct sigaction s_action; 97 /*98 97 static int termsigs[] = { 99 98 #ifdef HAVE_SIGBUS … … 104 103 SIGINT, 105 104 SIGQUIT, 106 //SIGSEGV,105 SIGSEGV, 107 106 SIGTERM 108 107 }; 109 */110 108 111 109 … … 124 122 125 123 126 /*127 124 // install handler for signals which terminate the process 128 125 memset(&s_action, 0, sizeof(struct sigaction)); … … 137 134 } 138 135 } 139 */140 136 141 137 return; nebula/trunk/src/util.c
r1618 r1620 20 20 21 21 #include <dirent.h> 22 #include <errno.h> 22 23 #include <fcntl.h> 23 24 #include <math.h> … … 126 127 trie_delete(spamsum_trie.childlist, spamsum_trie.childlist_len, NULL); 127 128 trie_delete(sighash_trie.childlist, sighash_trie.childlist_len, NULL); 129 130 // destroy semaphores 131 if (sem_getvalue(&sessions_semaphore, &i) == -1) { 132 fprintf(stderr, "Error - Unable to get semaphore value: %s.\n", strerror(errno)); 133 exit(EXIT_FAILURE); 134 } 135 for (; i; i--) sem_wait(&sessions_semaphore); 136 if (sem_destroy(&sessions_semaphore) == -1) { 137 fprintf(stderr, "Error - Unable to destroy semaphore: %s.\n", strerror(errno)); 138 exit(EXIT_FAILURE); 139 } 128 140 141 if (sem_getvalue(&siggen_semaphore, &i) == -1) { 142 fprintf(stderr, "Error - Unable to get semaphore value: %s.\n", strerror(errno)); 143 exit(EXIT_FAILURE); 144 } 145 for (; i; i--) sem_wait(&siggen_semaphore); 146 if (sem_destroy(&siggen_semaphore) == -1) { 147 fprintf(stderr, "Error - Unable to destroy semaphore: %s.\n", strerror(errno)); 148 exit(EXIT_FAILURE); 149 } 150 151 // destroy mutexes and rwlocks 129 152 pthread_mutex_destroy(&siggen_mutex); 130 153 pthread_mutex_destroy(&sigwrite_mutex);
